From 0fe933e23d026fce6fd065f87e66c2637693e963 Mon Sep 17 00:00:00 2001 From: Jirka Borovec Date: Tue, 28 Jul 2020 01:07:09 +0200 Subject: [PATCH] fixing TPU tests (#2632) * init * rename * tpu_core_idx * idx 8 * idxs * @pl_multi_process_test * assert * assert * deamon * no close * imort * msg * use_single_gpu * dataset * idx * fix idx * dataset * format * add pickable * typo * apex * typo * wip * wip * wip * wip * wip * wip * wip * wip * docs * typo * tests * tests * tests * tests * tests * tests * tests * tests * tests * tests * tests * tests * tests * tests * tests * tests * tests * docs * docs * Apply suggestions from code review Co-authored-by: Ethan Harris Co-authored-by: Rohit Gupta * Apply suggestions from code review Co-authored-by: Ethan Harris * docs * Apply suggestions from code review Co-authored-by: Rohit Gupta Co-authored-by: Ethan Harris Co-authored-by: Rohit Gupta --- .circleci/config.yml | 8 +- .github/workflows/ci-testing.yml | 4 + .github/workflows/tpu-testing.yml | 12 +- CHANGELOG.md | 2 + docs/source/new-project.rst | 2 - pytorch_lightning/__init__.py | 2 +- .../accelerator_backends/ddp_spawn_backend.py | 19 +- .../accelerator_backends/gpu_backend.py | 5 +- .../accelerator_backends/tpu_backend.py | 105 +++++++---- pytorch_lightning/core/__init__.py | 6 +- pytorch_lightning/core/decorators.py | 2 - .../trainer/distrib_data_parallel.py | 44 +++-- pytorch_lightning/trainer/distrib_parts.py | 5 +- pytorch_lightning/trainer/evaluation_loop.py | 4 +- pytorch_lightning/trainer/trainer.py | 25 +-- pytorch_lightning/trainer/training_loop.py | 4 +- tests/base/datasets.py | 26 ++- tests/base/develop_utils.py | 14 +- tests/base/test_datasets.py | 19 ++ tests/models/test_grad_norm.py | 4 +- tests/models/test_tpu.py | 169 ++++++++++++------ tests/trainer/test_trainer.py | 36 ++-- .../test_trainer_test_loop.py} | 14 +- 23 files changed, 339 insertions(+), 192 deletions(-) create mode 100644 tests/base/test_datasets.py rename tests/{models/test_test_loop.py => trainer/test_trainer_test_loop.py} (95%) diff --git a/.circleci/config.yml b/.circleci/config.yml index a515ef544f565..ba24d3bb82086 100755 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -62,10 +62,11 @@ references: # happened to the job in Kubernetes. If we try MAX_CHECKS times and # still the job hasn't finished, give up and return the starting # non-zero status code. - while [ $i -lt $MAX_CHECKS ]; do ((i++)); if kubectl get jobs $job_name -o jsonpath='Failed:{.status.failed}' | grep "Failed:1"; then status_code=1 && break; elif kubectl get jobs $job_name -o jsonpath='Succeeded:{.status.succeeded}' | grep "Succeeded:1" ; then status_code=0 && break; else echo "Job not finished yet"; fi; sleep 30; done && \ + printf "Waiting for job to finish: " && \ + while [ $i -lt $MAX_CHECKS ]; do ((i++)); if kubectl get jobs $job_name -o jsonpath='Failed:{.status.failed}' | grep "Failed:1"; then status_code=1 && break; elif kubectl get jobs $job_name -o jsonpath='Succeeded:{.status.succeeded}' | grep "Succeeded:1" ; then status_code=0 && break; else printf "."; fi; sleep $CHECK_SPEEP; done && \ echo "Done waiting. Job status code: $status_code" && \ # Allow time for logs to flush. - sleep 30 && \ + sleep 10 && \ echo "JOB_NAME: $job_name" && \ gcloud logging read "resource.type=k8s_container resource.labels.project_id=$GOOGLE_PROJECT_ID resource.labels.location=$GOOGLE_COMPUTE_ZONE resource.labels.cluster_name=$GKE_CLUSTER resource.labels.namespace_name=default resource.labels.pod_name:$job_name" --limit 10000000 --order asc --format 'value(textPayload)' --project=$GOOGLE_PROJECT_ID > /tmp/full_output.txt && \ if grep -q '' /tmp/full_output.txt ; then csplit /tmp/full_output.txt '//'; else mv /tmp/full_output.txt xx00; fi && \ @@ -101,7 +102,8 @@ jobs: docker: - image: circleci/python:3.7 environment: - - MAX_CHECKS: 60 + - MAX_CHECKS: 240 + - CHECK_SPEEP: 5 steps: - checkout - go/install diff --git a/.github/workflows/ci-testing.yml b/.github/workflows/ci-testing.yml index 7c3095fc281a1..f08e54d136ef2 100644 --- a/.github/workflows/ci-testing.yml +++ b/.github/workflows/ci-testing.yml @@ -30,6 +30,10 @@ jobs: # TODO: temporary fix till hanging jobs on macOS for py38 is resolved - python-version: 3.8 os: macOS-10.15 + # TODO: temporary fix till pyYaml can be installed, see: https://github.com/actions/setup-python/issues/114 + - python-version: 3.7 + os: ubuntu-18.04 + requires: 'minimal' # Timeout: https://stackoverflow.com/a/59076067/4521646 timeout-minutes: 25 diff --git a/.github/workflows/tpu-testing.yml b/.github/workflows/tpu-testing.yml index d4ab768a885dd..245e424181007 100644 --- a/.github/workflows/tpu-testing.yml +++ b/.github/workflows/tpu-testing.yml @@ -14,6 +14,8 @@ env: GKE_CLUSTER: lightning-cluster GKE_ZONE: us-central1-a IMAGE: gcr.io/${{ secrets.GKE_PROJECT }}/tpu-testing-image + MAX_CHECKS: 240 + CHECK_SPEEP: 5 jobs: setup-build-publish-deploy: @@ -82,17 +84,17 @@ jobs: job_name=${job_name% created} && \ echo "Waiting on kubernetes job: $job_name in cluster: $GKE_CLUSTER" && \ i=0 && \ - # 30 checks spaced 30s apart = 900s total. - max_checks=30 && \ + # 60 checks spaced 30s apart = 900s total. status_code=2 && \ # Check on the job periodically. Set the status code depending on what - # happened to the job in Kubernetes. If we try max_checks times and + # happened to the job in Kubernetes. If we try MAX_CHECKS times and # still the job hasn't finished, give up and return the starting # non-zero status code. - while [ $i -lt $max_checks ]; do ((i++)); if kubectl get jobs $job_name -o jsonpath='Failed:{.status.failed}' | grep "Failed:1"; then status_code=1 && break; elif kubectl get jobs $job_name -o jsonpath='Succeeded:{.status.succeeded}' | grep "Succeeded:1" ; then status_code=0 && break; else echo "Job not finished yet"; fi; sleep 30; done && \ + printf "Waiting for job to finish: " && \ + while [ $i -lt $MAX_CHECKS ]; do ((i++)); if kubectl get jobs $job_name -o jsonpath='Failed:{.status.failed}' | grep "Failed:1"; then status_code=1 && break; elif kubectl get jobs $job_name -o jsonpath='Succeeded:{.status.succeeded}' | grep "Succeeded:1" ; then status_code=0 && break; else printf "." ; fi; sleep $CHECK_SPEEP; done && \ echo "Done waiting. Job status code: $status_code" && \ # Allow time for logs to flush. - sleep 60 && \ + sleep 10 && \ echo "JOB_NAME: $job_name" && \ echo "GKE_CLUSTER: $GKE_CLUSTER" && \ echo "GKE_ZONE: $GKE_ZONE" && \ diff --git a/CHANGELOG.md b/CHANGELOG.md index af692fdd0f938..e65efa50cb38d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,6 +38,8 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/). - Fixed `weights_save_path` getting ignored when `logger=False` is passed to Trainer ([#2681](https://github.com/PyTorchLightning/pytorch-lightning/pull/2681)) +- Fixed TPU multi-core and Float16 ([#2632](https://github.com/PyTorchLightning/pytorch-lightning/pull/2632)) + ## [0.8.5] - 2020-07-09 ### Added diff --git a/docs/source/new-project.rst b/docs/source/new-project.rst index 043461fca0b89..dd28bc311d401 100644 --- a/docs/source/new-project.rst +++ b/docs/source/new-project.rst @@ -6,8 +6,6 @@ import torch from torch.nn import functional as F from torch.utils.data import DataLoader - from torchvision.datasets import MNIST - from torchvision import transforms Quick Start diff --git a/pytorch_lightning/__init__.py b/pytorch_lightning/__init__.py index ee49879ee3665..5cb42370c28db 100644 --- a/pytorch_lightning/__init__.py +++ b/pytorch_lightning/__init__.py @@ -54,11 +54,11 @@ # We are not importing the rest of the lightning during the build process, as it may not be compiled yet else: from pytorch_lightning.core import LightningDataModule, LightningModule, data_loader + from pytorch_lightning.core.step_result import TrainResult, EvalResult from pytorch_lightning.callbacks import Callback from pytorch_lightning.trainer import Trainer from pytorch_lightning.utilities.seed import seed_everything from pytorch_lightning import metrics - from pytorch_lightning.core.step_result import TrainResult, EvalResult __all__ = [ 'Trainer', diff --git a/pytorch_lightning/accelerator_backends/ddp_spawn_backend.py b/pytorch_lightning/accelerator_backends/ddp_spawn_backend.py index 7cd05fea87ca4..6aee68f6634f2 100644 --- a/pytorch_lightning/accelerator_backends/ddp_spawn_backend.py +++ b/pytorch_lightning/accelerator_backends/ddp_spawn_backend.py @@ -30,26 +30,27 @@ class DDPSpawnBackend(object): def __init__(self, trainer): self.trainer = trainer - self.q = None + self.mp_queue = None def setup(self): self.trainer.set_random_port() # pass in a state q smp = mp.get_context('spawn') - self.q = smp.SimpleQueue() + self.mp_queue = smp.SimpleQueue() def train(self, model, nprocs): - mp.spawn(self.ddp_train, nprocs=nprocs, args=(self.q, model,)) + mp.spawn(self.ddp_train, nprocs=nprocs, args=(self.mp_queue, model,)) def teardown(self, model): # restore main state with best weights - best_path = self.q.get() - results = self.q.get() - last_path = self.q.get() + best_path = self.mp_queue.get() + results = self.mp_queue.get() + last_path = self.mp_queue.get() # transfer back the best path to the trainer self.trainer.checkpoint_callback.best_model_path = best_path + # todo, pass also bets score # load last weights if last_path is not None and not self.trainer.testing: @@ -59,13 +60,13 @@ def teardown(self, model): self.trainer.model = model return results - def ddp_train(self, process_idx, q, model, is_master=False, proc_offset=0): + def ddp_train(self, process_idx, mp_queue, model, is_master=False, proc_offset=0): """ Entry point for ddp Args: process_idx: - q: + mp_queue: multiprocessing queue model: is_master: proc_offset: @@ -166,7 +167,7 @@ def ddp_train(self, process_idx, q, model, is_master=False, proc_offset=0): model = self.trainer.get_model() # persist info in ddp_spawn - self.trainer.transfer_ddp_spawn_state_on_fit_end(model, q, results) + self.trainer.transfer_distrib_spawn_state_on_fit_end(model, mp_queue, results) # clean up memory torch.cuda.empty_cache() diff --git a/pytorch_lightning/accelerator_backends/gpu_backend.py b/pytorch_lightning/accelerator_backends/gpu_backend.py index 81128e1009425..3b5f37671d9e8 100644 --- a/pytorch_lightning/accelerator_backends/gpu_backend.py +++ b/pytorch_lightning/accelerator_backends/gpu_backend.py @@ -14,6 +14,7 @@ import torch +from pytorch_lightning.core import LightningModule try: from apex import amp except ImportError: @@ -45,7 +46,7 @@ def setup(self, model): # TODO: remove with dropping NVIDIA AMP support native_amp_available = hasattr(torch.cuda, "amp") and hasattr(torch.cuda.amp, "autocast") - if self.trainer.use_amp and not native_amp_available: + if APEX_AVAILABLE and self.trainer.use_amp and not native_amp_available: model = self._setup_nvidia_apex(model) return model @@ -53,7 +54,7 @@ def train(self, model): results = self.trainer.run_pretrain_routine(model) return results - def _setup_nvidia_apex(self, model): + def _setup_nvidia_apex(self, model: LightningModule): model, optimizers = model.configure_apex(amp, model, self.trainer.optimizers, self.trainer.amp_level) self.trainer.optimizers = optimizers self.trainer.reinit_scheduler_properties(self.trainer.optimizers, self.trainer.lr_schedulers) diff --git a/pytorch_lightning/accelerator_backends/tpu_backend.py b/pytorch_lightning/accelerator_backends/tpu_backend.py index 27b6ce0f92416..8d1d1b271b7dc 100644 --- a/pytorch_lightning/accelerator_backends/tpu_backend.py +++ b/pytorch_lightning/accelerator_backends/tpu_backend.py @@ -13,11 +13,15 @@ # limitations under the License. import os + +import torch +import torch.multiprocessing as mp + +from pytorch_lightning.core import LightningModule from pytorch_lightning.utilities import rank_zero_info, rank_zero_only, rank_zero_warn from pytorch_lightning.utilities.exceptions import MisconfigurationException from pytorch_lightning import _logger as log - try: import torch_xla import torch_xla.core.xla_model as xm @@ -33,31 +37,52 @@ class TPUBackend(object): def __init__(self, trainer): self.trainer = trainer self.start_method = None + self.mp_queue = None def setup(self): rank_zero_info(f'training on {self.trainer.tpu_cores} TPU cores') if not XLA_AVAILABLE: - raise MisconfigurationException('No TPU devices found.') + raise MisconfigurationException('PyTorch XLA not installed.') + + # see: https://discuss.pytorch.org/t/segfault-with-multiprocessing-queue/81292/2 + self.start_method = 'fork' + + # pass in a state q + smp = mp.get_context(self.start_method) + self.mp_queue = smp.SimpleQueue() + + def teardown(self, model): + # restore main state with best weights + best_path = self.mp_queue.get() + results = self.mp_queue.get() + last_path = self.mp_queue.get() - # COLAB_GPU is an env var available by default in Colab environments. - self.start_method = 'fork' if self.trainer.on_colab_kaggle else 'spawn' + # transfer back the best path to the trainer + self.trainer.checkpoint_callback.best_model_path = best_path + # todo, pass also bets score - def teardown(self): + # load last weights + if last_path and not self.trainer.testing: + ckpt = torch.load(last_path, map_location=lambda storage, loc: storage) + model.load_state_dict(ckpt) + + self.trainer.model = model # when training completes, load the weights back in main process self.__load_weights_on_main_process() + return results - def train(self, model): + def train(self, model: LightningModule): self.trainer.model = model # train if self.trainer.tpu_id is not None: - self.tpu_train_in_process(self.trainer.tpu_id, model) + self.tpu_train_in_process(self.trainer.tpu_id, model, self.trainer, self.mp_queue) else: xmp.spawn( self.tpu_train_in_process, - args=(model,), + args=(model, self.trainer, self.mp_queue), nprocs=self.trainer.tpu_cores, start_method=self.start_method ) @@ -71,63 +96,69 @@ def __load_weights_on_main_process(self): self.trainer.model = model - def tpu_train_in_process(self, tpu_core_idx, model): + def tpu_train_in_process(self, tpu_core_idx: int, model: LightningModule, trainer=None, mp_queue=None): """ Here we are inside each individual process """ - if not self.trainer.testing: - self.trainer.setup('fit') + if not trainer: + trainer = self.trainer + if not trainer.testing: + trainer.setup('fit') model.setup('fit') # setup TPU training - self.__setup_tpu_training(model) + self.__setup_tpu_training(model, trainer) # Run the pretrain routine - self.trainer.run_pretrain_routine(model) + results = trainer.run_pretrain_routine(model) # save weights at the end of training - self.__save_end_of_training_weights(model) + self.__save_end_of_training_weights(model, trainer) - def __save_end_of_training_weights(self, model): + # persist info in spawn + trainer.transfer_distrib_spawn_state_on_fit_end(model, mp_queue, results) + def __save_end_of_training_weights(self, model: LightningModule, trainer): # when training ends on these platforms dump weights to get out of the main process - if self.trainer.on_colab_kaggle: + if trainer.on_colab_kaggle: rank_zero_warn('cleaning up... please do not interrupt') - self.trainer.save_spawn_weights(model) + trainer.save_spawn_weights(model) - def __setup_tpu_training(self, model): + def __setup_tpu_training(self, model: LightningModule, trainer): # use the default device from the process - tpu_device = xm.xla_device() + # tpu_device = xm.xla_device() # if given an ordinal device, use this as the device - if self.trainer.tpu_id is not None: - tpu_device = xm.xla_device(self.trainer.tpu_id) - + if trainer.tpu_id is not None: + tpu_device = xm.xla_device(trainer.tpu_id) + else: + tpu_device = xm.xla_device() # track the device and move model to it - self.trainer._device = tpu_device - model.to(self.trainer._device) + trainer._device = tpu_device + model.to(trainer._device) # get the appropriate tpu ranks - self.trainer.tpu_local_core_rank = xm.get_local_ordinal() - self.trainer.tpu_global_core_rank = xm.get_ordinal() + trainer.tpu_local_core_rank = xm.get_local_ordinal() + trainer.tpu_global_core_rank = xm.get_ordinal() # avoid duplicating progress bar - if self.trainer.tpu_global_core_rank != 0 and self.trainer.progress_bar_callback is not None: - self.trainer.progress_bar_callback.disable() + if trainer.tpu_global_core_rank != 0 and trainer.progress_bar_callback is not None: + trainer.progress_bar_callback.disable() - self.trainer.global_rank = self.trainer.tpu_local_core_rank - rank_zero_only.rank = self.trainer.global_rank + trainer.global_rank = trainer.tpu_local_core_rank + rank_zero_only.rank = trainer.global_rank # CHOOSE OPTIMIZER # allow for lr schedulers as well - optimizers, lr_schedulers, optimizer_frequencies = self.trainer.init_optimizers(model) - self.trainer.optimizers = optimizers - self.trainer.lr_schedulers = lr_schedulers - self.trainer.optimizer_frequencies = optimizer_frequencies + optimizers, lr_schedulers, optimizer_frequencies = trainer.init_optimizers(model) + trainer.optimizers = optimizers + trainer.lr_schedulers = lr_schedulers + trainer.optimizer_frequencies = optimizer_frequencies # init 16 bit for TPU - if self.trainer.precision == 16: + if trainer.precision == 16: os.environ['XLA_USE_BF16'] = str(1) - log.info(f'INIT TPU local core: {self.trainer.tpu_local_core_rank},' - f' global rank: {self.trainer.tpu_global_core_rank}') + log.info(f'INIT TPU local core: {trainer.tpu_local_core_rank},' + f' global rank: {trainer.tpu_global_core_rank}' + f' with XLA_USE_BF16={os.environ.get("XLA_USE_BF16")}') diff --git a/pytorch_lightning/core/__init__.py b/pytorch_lightning/core/__init__.py index 8a7770752f951..43861ad6dd785 100644 --- a/pytorch_lightning/core/__init__.py +++ b/pytorch_lightning/core/__init__.py @@ -305,5 +305,9 @@ def training_step(self, batch, batch_idx): from pytorch_lightning.core.decorators import data_loader from pytorch_lightning.core.lightning import LightningModule -__all__ = ['LightningDataModule', 'LightningModule', 'data_loader'] +__all__ = [ + 'LightningDataModule', + 'LightningModule', + 'data_loader', +] # __call__ = __all__ diff --git a/pytorch_lightning/core/decorators.py b/pytorch_lightning/core/decorators.py index b540c1b66bab3..6023de27d853b 100644 --- a/pytorch_lightning/core/decorators.py +++ b/pytorch_lightning/core/decorators.py @@ -1,8 +1,6 @@ from functools import wraps from typing import Callable -import torch - from pytorch_lightning.core.lightning import LightningModule from pytorch_lightning.utilities import rank_zero_warn diff --git a/pytorch_lightning/trainer/distrib_data_parallel.py b/pytorch_lightning/trainer/distrib_data_parallel.py index 63c6c9b8513ab..2546ec7bb3c7d 100644 --- a/pytorch_lightning/trainer/distrib_data_parallel.py +++ b/pytorch_lightning/trainer/distrib_data_parallel.py @@ -171,16 +171,14 @@ def train_fx(trial_hparams, cluster_manager, _): try: import torch_xla - import torch_xla.core.xla_model as xm - import torch_xla.distributed.xla_multiprocessing as xmp except ImportError: XLA_AVAILABLE = False else: XLA_AVAILABLE = True -pid = os.getpid() -rng1 = np.random.RandomState(pid) -RANDOM_PORTS = rng1.randint(10000, 19999, 1000) +PID = os.getpid() +RNG1 = np.random.RandomState(PID) +RANDOM_PORTS = RNG1.randint(10000, 19999, 1000) class TrainerDDPMixin(ABC): @@ -253,7 +251,7 @@ def is_function_implemented(self, *args) -> bool: def init_tpu(self): # turn off all the GPU stuff - self.distributed_backend = None + # self.distributed_backend = 'tpu' # enable tpu self.use_tpu = True @@ -263,7 +261,7 @@ def set_distributed_mode(self, distributed_backend): self.use_ddp = False self.use_ddp2 = False self.use_horovod = False - self.single_gpu = False + self.use_single_gpu = False if distributed_backend is None: if self.has_horovodrun(): @@ -272,7 +270,7 @@ def set_distributed_mode(self, distributed_backend): if self.num_nodes > 1 or self.num_processes > 1: self.use_ddp = True # ddp_cpu elif self.num_gpus == 1: - self.single_gpu = True + self.use_single_gpu = True elif self.num_gpus > 1: rank_zero_warn('You requested multiple GPUs but did not specify a backend, e.g.' ' Trainer(distributed_backend=dp) (or ddp, ddp2).' @@ -283,7 +281,7 @@ def set_distributed_mode(self, distributed_backend): if distributed_backend == "dp": # do nothing if num_gpus == 0 if self.num_gpus == 1: - self.single_gpu = True + self.use_single_gpu = True self.use_dp = True elif self.num_gpus > 1: self.use_dp = True @@ -293,7 +291,7 @@ def set_distributed_mode(self, distributed_backend): if self.num_nodes > 1 or self.num_processes > 1: self.use_ddp = True # ddp_cpu elif self.num_gpus == 1: - self.single_gpu = True + self.use_single_gpu = True self.use_ddp = True elif self.num_gpus > 1: self.use_ddp = True @@ -364,7 +362,6 @@ def configure_slurm_ddp(self, num_gpu_nodes): def determine_local_rank(self): if self.is_slurm_managing_tasks: return int(os.environ['SLURM_LOCALID']) - else: return int(os.environ.get('LOCAL_RANK', 0)) @@ -476,18 +473,18 @@ def spawn_ddp_children(self, model): sleep(delay) local_rank = 0 - results = self.ddp_train(local_rank, q=None, model=model, is_master=True) + results = self.ddp_train(local_rank, mp_queue=None, model=model, is_master=True) del os.environ['WORLD_SIZE'] return results - def ddp_train(self, process_idx, q, model, is_master=False, proc_offset=0): + def ddp_train(self, process_idx, mp_queue, model, is_master=False, proc_offset=0): """ Entry point for ddp Args: process_idx: - q: + mp_queue: multiprocessing queue model: is_master: proc_offset: @@ -580,7 +577,7 @@ def ddp_train(self, process_idx, q, model, is_master=False, proc_offset=0): model = self.get_model() # persist info in ddp_spawn - self.transfer_ddp_spawn_state_on_fit_end(model, q, results) + self.transfer_distrib_spawn_state_on_fit_end(model, mp_queue, results) # clean up memory torch.cuda.empty_cache() @@ -588,8 +585,8 @@ def ddp_train(self, process_idx, q, model, is_master=False, proc_offset=0): if self.global_rank == 0 and self.distributed_backend not in ['ddp_spawn', 'ddp_cpu']: return results - def transfer_ddp_spawn_state_on_fit_end(self, model, q, results): - if self.distributed_backend not in ['ddp_spawn', 'ddp_cpu', 'tpu']: + def transfer_distrib_spawn_state_on_fit_end(self, model, mp_queue, results): + if self.distributed_backend.lower() not in ['ddp_spawn', 'ddp_cpu', 'tpu']: return # track the best model path @@ -597,17 +594,18 @@ def transfer_ddp_spawn_state_on_fit_end(self, model, q, results): if self.checkpoint_callback is not None: best_model_path = self.checkpoint_callback.best_model_path - if self.global_rank == 0 and q is not None: + if self.global_rank == 0 and mp_queue is not None: rank_zero_warn('cleaning up ddp environment...') - q.put(best_model_path) - q.put(results) + # todo, pass complete checkpoint as state dictionary + mp_queue.put(best_model_path) + mp_queue.put(results) # save the last weights last_path = None if not self.testing and best_model_path is not None and len(best_model_path) > 0: last_path = re.sub('.ckpt', '.tmp_end.ckpt', best_model_path) torch.save(model.state_dict(), last_path) - q.put(last_path) + mp_queue.put(last_path) def save_spawn_weights(self, model): """ @@ -616,7 +614,7 @@ def save_spawn_weights(self, model): :return: """ if self.is_global_zero: - path = os.path.join(self.default_root_dir, '__temp_weight_ddp_end.ckpt') + path = os.path.join(self.default_root_dir, '__temp_weight_distributed_end.ckpt') self.save_checkpoint(path) return path @@ -632,7 +630,7 @@ def load_spawn_weights(self, original_model): if self.is_global_zero: # load weights saved in ddp - path = os.path.join(self.default_root_dir, '__temp_weight_ddp_end.ckpt') + path = os.path.join(self.default_root_dir, '__temp_weight_distributed_end.ckpt') loaded_model = original_model.__class__.load_from_checkpoint(path) # copy loaded weights to old model diff --git a/pytorch_lightning/trainer/distrib_parts.py b/pytorch_lightning/trainer/distrib_parts.py index 3c0461e713832..63db623d91f0b 100644 --- a/pytorch_lightning/trainer/distrib_parts.py +++ b/pytorch_lightning/trainer/distrib_parts.py @@ -69,7 +69,7 @@ class TrainerDPMixin(ABC): use_ddp2: bool use_ddp: bool testing: bool - single_gpu: bool + use_single_gpu: bool root_gpu: ... amp_level: str precision: ... @@ -79,7 +79,6 @@ class TrainerDPMixin(ABC): use_tpu: bool data_parallel_device_ids: ... progress_bar_callback: ... - tpu_id: Optional[int] on_colab_kaggle: str save_spawn_weights: Callable logger: ... @@ -129,7 +128,7 @@ def copy_trainer_model_properties(self, model): m.use_ddp = self.use_ddp m.use_amp = self.use_amp m.testing = self.testing - m.single_gpu = self.single_gpu + m.use_single_gpu = self.use_single_gpu m.use_tpu = self.use_tpu m.tpu_local_core_rank = self.tpu_local_core_rank m.tpu_global_core_rank = self.tpu_global_core_rank diff --git a/pytorch_lightning/trainer/evaluation_loop.py b/pytorch_lightning/trainer/evaluation_loop.py index f52f9c12f3c83..add9bb24c672a 100644 --- a/pytorch_lightning/trainer/evaluation_loop.py +++ b/pytorch_lightning/trainer/evaluation_loop.py @@ -163,7 +163,7 @@ class TrainerEvaluationLoopMixin(ABC): use_dp: bool use_ddp2: bool use_horovod: bool - single_gpu: bool + use_single_gpu: bool data_parallel_device_ids: ... model: LightningModule num_test_batches: List[int] @@ -616,7 +616,7 @@ def evaluation_forward(self, model, batch, batch_idx, dataloader_idx, test_mode: args[0] = batch # single GPU data transfer - if self.single_gpu: + if self.use_single_gpu: # for single GPU put inputs on gpu manually root_gpu = 0 if isinstance(self.data_parallel_device_ids, list): diff --git a/pytorch_lightning/trainer/trainer.py b/pytorch_lightning/trainer/trainer.py index b774ce13e1ef0..a2181c0bca6cc 100644 --- a/pytorch_lightning/trainer/trainer.py +++ b/pytorch_lightning/trainer/trainer.py @@ -51,7 +51,8 @@ from pytorch_lightning.utilities.debugging import InternalDebugger from pytorch_lightning.utilities.exceptions import MisconfigurationException from pytorch_lightning.trainer.configuration_validator import ConfigValidator -from pytorch_lightning import accelerator_backends +from pytorch_lightning.accelerator_backends import ( + GPUBackend, TPUBackend, CPUBackend, DDPSpawnBackend, DataParallelBackend) # warnings to ignore in trainer warnings.filterwarnings( @@ -1061,7 +1062,7 @@ def fit( elif 'WORLD_SIZE' in os.environ and ('GROUP_RANK' in os.environ or 'NODE_RANK' in os.environ): task = int(os.environ['LOCAL_RANK']) - self.ddp_train(process_idx=task, q=None, model=model) + self.ddp_train(process_idx=task, mp_queue=None, model=model) elif self.use_ddp: @@ -1070,21 +1071,21 @@ def fit( if self.is_slurm_managing_tasks: task = int(os.environ['SLURM_LOCALID']) - self.ddp_train(process_idx=task, q=None, model=model) + self.ddp_train(process_idx=task, mp_queue=None, model=model) # torchelastic or general non_slurm ddp elif 'WORLD_SIZE' in os.environ and ('GROUP_RANK' in os.environ or 'NODE_RANK' in os.environ): task = int(os.environ['LOCAL_RANK']) - self.ddp_train(process_idx=task, q=None, model=model) + self.ddp_train(process_idx=task, mp_queue=None, model=model) elif self.distributed_backend == 'ddp_cpu': - self.accelerator_backend = accelerator_backends.DDPSpawnBackend(self) + self.accelerator_backend = DDPSpawnBackend(self) self.accelerator_backend.setup() self.accelerator_backend.train(model, nprocs=self.num_processes) results = self.accelerator_backend.teardown(model) elif self.distributed_backend == 'ddp_spawn': - self.accelerator_backend = accelerator_backends.DDPSpawnBackend(self) + self.accelerator_backend = DDPSpawnBackend(self) self.accelerator_backend.setup() self.accelerator_backend.train(model, nprocs=self.num_processes) results = self.accelerator_backend.teardown(model) @@ -1094,7 +1095,7 @@ def fit( results = self.spawn_ddp_children(model) elif self.use_dp: - self.accelerator_backend = accelerator_backends.DataParallelBackend(self) + self.accelerator_backend = DataParallelBackend(self) self.accelerator_backend.setup(model) results = self.accelerator_backend.train() self.accelerator_backend.teardown() @@ -1102,19 +1103,19 @@ def fit( elif self.use_horovod: results = self.horovod_train(model) - elif self.single_gpu: - self.accelerator_backend = accelerator_backends.GPUBackend(self) + elif self.use_single_gpu: + self.accelerator_backend = GPUBackend(self) model = self.accelerator_backend.setup(model) results = self.accelerator_backend.train(model) elif self.use_tpu: - self.accelerator_backend = accelerator_backends.TPUBackend(self) + self.accelerator_backend = TPUBackend(self) self.accelerator_backend.setup() self.accelerator_backend.train(model) - self.accelerator_backend.teardown() + self.accelerator_backend.teardown(model) else: - self.accelerator_backend = accelerator_backends.CPUBackend(self) + self.accelerator_backend = CPUBackend(self) self.accelerator_backend.setup(model) results = self.accelerator_backend.train(model) diff --git a/pytorch_lightning/trainer/training_loop.py b/pytorch_lightning/trainer/training_loop.py index 7f12d20151c24..a0f0144679df9 100644 --- a/pytorch_lightning/trainer/training_loop.py +++ b/pytorch_lightning/trainer/training_loop.py @@ -217,7 +217,7 @@ class TrainerTrainLoopMixin(ABC): use_dp: bool use_ddp2: bool use_horovod: bool - single_gpu: bool + use_single_gpu: bool use_tpu: bool data_parallel_device_ids: ... check_val_every_n_epoch: ... @@ -1068,7 +1068,7 @@ def training_forward(self, batch, batch_idx, opt_idx, hiddens): output = self.model.training_step(*args) # single GPU forward - elif self.single_gpu: + elif self.use_single_gpu: gpu_id = 0 if isinstance(self.data_parallel_device_ids, list): gpu_id = self.data_parallel_device_ids[0] diff --git a/tests/base/datasets.py b/tests/base/datasets.py index 5eb32656c83a3..27e614eee68cf 100644 --- a/tests/base/datasets.py +++ b/tests/base/datasets.py @@ -1,6 +1,8 @@ import logging import os +import random import urllib.request +import time from typing import Tuple, Optional, Sequence import torch @@ -61,7 +63,7 @@ def __init__(self, root: str = PATH_DATASETS, train: bool = True, raise RuntimeError('Dataset not found.') data_file = self.TRAIN_FILE_NAME if self.train else self.TEST_FILE_NAME - self.data, self.targets = torch.load(os.path.join(self.cached_folder_path, data_file)) + self.data, self.targets = _try_load(os.path.join(self.cached_folder_path, data_file)) def __getitem__(self, idx: int) -> Tuple[Tensor, int]: img = self.data[idx].float().unsqueeze(0) @@ -103,6 +105,26 @@ def _download(self, data_folder: str) -> None: urllib.request.urlretrieve(url, fpath) +def _try_load(path_data, trials: int = 30, delta: float = 1.): + """Resolving loading from the same time from multiple concurrentprocesses.""" + res, exp = None, None + assert trials, "at least some trial has to be set" + assert os.path.isfile(path_data), 'missing file: %s' % path_data + for _ in range(trials): + try: + res = torch.load(path_data) + except Exception as ex: + exp = ex + time.sleep(delta * random.random()) + else: + break + else: + # raise the caught exception if any + if exp: + raise exp + return res + + def normalize_tensor(tensor: Tensor, mean: float = 0.0, std: float = 1.0) -> Tensor: tensor = tensor.clone() mean = torch.as_tensor(mean, dtype=tensor.dtype, device=tensor.device) @@ -187,7 +209,7 @@ def prepare_data(self, download: bool) -> None: for fname in (self.TRAIN_FILE_NAME, self.TEST_FILE_NAME): path_fname = os.path.join(super().cached_folder_path, fname) assert os.path.isfile(path_fname), 'Missing cached file: %s' % path_fname - data, targets = torch.load(path_fname) + data, targets = _try_load(path_fname) data, targets = self._prepare_subset(data, targets, self.num_samples, self.digits) torch.save((data, targets), os.path.join(self.cached_folder_path, fname)) diff --git a/tests/base/develop_utils.py b/tests/base/develop_utils.py index 068c8294ee5cb..ada745951494c 100644 --- a/tests/base/develop_utils.py +++ b/tests/base/develop_utils.py @@ -60,7 +60,7 @@ def get_data_path(expt_logger, path_dir=None): return path_expt -def load_model_from_checkpoint(logger, root_weights_dir, module_class=EvalModelTemplate, path_expt=None): +def load_model_from_checkpoint(logger, root_weights_dir, module_class=EvalModelTemplate): trained_model = module_class.load_from_checkpoint(root_weights_dir) assert trained_model is not None, 'loading model failed' return trained_model @@ -89,6 +89,7 @@ def init_checkpoint_callback(logger): def pl_multi_process_test(func): + """Wrapper for running multi-processing tests.""" @functools.wraps(func) def wrapper(*args, **kwargs): @@ -100,15 +101,16 @@ def inner_f(queue, **kwargs): try: func(**kwargs) queue.put(1) - except Exception as e: + except Exception: import traceback traceback.print_exc() queue.put(-1) - p = Process(target=inner_f, args=(queue,), kwargs=kwargs) - p.start() - p.join() + proc = Process(target=inner_f, args=(queue,), kwargs=kwargs) + proc.start() + proc.join() + result = queue.get() - assert result == 1 + assert result == 1, 'expected 1, but returned %s' % result return wrapper diff --git a/tests/base/test_datasets.py b/tests/base/test_datasets.py new file mode 100644 index 0000000000000..2b3cd603b74cd --- /dev/null +++ b/tests/base/test_datasets.py @@ -0,0 +1,19 @@ +import pickle + +import cloudpickle +import pytest + +from tests.base.datasets import MNIST, TrialMNIST, AverageDataset + + +@pytest.mark.parametrize('dataset_cls', [MNIST, TrialMNIST, AverageDataset]) +def test_pickling_dataset_mnist(tmpdir, dataset_cls): + mnist = dataset_cls() + + mnist_pickled = pickle.dumps(mnist) + mnist_loaded = pickle.loads(mnist_pickled) + # assert vars(mnist) == vars(mnist_loaded) + + mnist_pickled = cloudpickle.dumps(mnist) + mnist_loaded = cloudpickle.loads(mnist_pickled) + # assert vars(mnist) == vars(mnist_loaded) diff --git a/tests/models/test_grad_norm.py b/tests/models/test_grad_norm.py index 7483167755a8f..dc7eee557d448 100644 --- a/tests/models/test_grad_norm.py +++ b/tests/models/test_grad_norm.py @@ -42,8 +42,8 @@ def on_after_backward(self): self.stored_grad_norms.append(out) -@pytest.mark.parametrize("norm_type", [1., 1.25, 1.5, 2, 3, 5, 10, 'inf']) -def test_grad_tracking(tmpdir, norm_type, rtol=1e-2): +@pytest.mark.parametrize("norm_type", [1., 1.25, 2, 3, 5, 10, 'inf']) +def test_grad_tracking(tmpdir, norm_type, rtol=5e-3): os.environ['PL_DEV_DEBUG'] = '1' # rtol=5e-3 respects the 3 decimals rounding in `.grad_norms` and above diff --git a/tests/models/test_tpu.py b/tests/models/test_tpu.py index 754e2652225c0..89d5dce279840 100644 --- a/tests/models/test_tpu.py +++ b/tests/models/test_tpu.py @@ -1,17 +1,19 @@ import os -from unittest.mock import patch import pytest +from torch.utils.data import DataLoader +import tests.base.develop_pipelines as tpipes from pytorch_lightning import Trainer from pytorch_lightning.utilities.exceptions import MisconfigurationException from tests.base import EvalModelTemplate -import tests.base.develop_pipelines as tpipes from tests.base.datasets import TrialMNIST -from torch.utils.data import DataLoader +from tests.base.develop_utils import pl_multi_process_test try: import torch_xla + import torch_xla.distributed.xla_multiprocessing as xmp + SERIAL_EXEC = xmp.MpSerialExecutor() # TODO: The tests are aborted if the following lines are uncommented. Must be resolved with XLA team # device = torch_xla.core.xla_model.xla_device() # device_type = torch_xla.core.xla_model.xla_device_hw(device) @@ -22,16 +24,33 @@ TPU_AVAILABLE = True +_LARGER_DATASET = TrialMNIST( + download=True, + num_samples=2000, + digits=(0, 1, 2, 5, 8), +) + + +# 8 cores needs a big dataset +def _serial_train_loader(): + return DataLoader( + _LARGER_DATASET, + batch_size=32, + ) + + @pytest.mark.skipif(not TPU_AVAILABLE, reason="test requires TPU machine") -def test_base_tpu_model_1(tmpdir): +@pl_multi_process_test +def test_model_tpu_cores_1(tmpdir): """Make sure model trains on TPU.""" trainer_options = dict( default_root_dir=tmpdir, progress_bar_refresh_rate=0, max_epochs=1, + distributed_backend='tpu', tpu_cores=1, limit_train_batches=0.4, - limit_val_batches=0.4 + limit_val_batches=0.4, ) model = EvalModelTemplate() @@ -39,56 +58,78 @@ def test_base_tpu_model_1(tmpdir): @pytest.mark.skipif(not TPU_AVAILABLE, reason="test requires TPU machine") -def test_base_tpu_model_idx_1(tmpdir): +@pl_multi_process_test +def test_model_tpu_index_1(tmpdir): """Make sure model trains on TPU.""" trainer_options = dict( default_root_dir=tmpdir, progress_bar_refresh_rate=0, max_epochs=1, + distributed_backend='tpu', tpu_cores=[1], limit_train_batches=0.4, - limit_val_batches=0.4 + limit_val_batches=0.4, ) model = EvalModelTemplate() tpipes.run_model_test(trainer_options, model, on_gpu=False, with_hpc=False) + assert torch_xla._XLAC._xla_get_default_device() == 'xla:1' @pytest.mark.skipif(not TPU_AVAILABLE, reason="test requires TPU machine") -def test_base_tpu_model_8(tmpdir): +@pl_multi_process_test +def test_model_tpu_index_5(tmpdir): """Make sure model trains on TPU.""" trainer_options = dict( default_root_dir=tmpdir, progress_bar_refresh_rate=0, max_epochs=1, - tpu_cores=8, + distributed_backend='tpu', + tpu_cores=[5], limit_train_batches=0.4, - limit_val_batches=0.4 + limit_val_batches=0.4, ) model = EvalModelTemplate() + tpipes.run_model_test(trainer_options, model, on_gpu=False, with_hpc=False) + assert torch_xla._XLAC._xla_get_default_device() == 'xla:5' + +@pytest.mark.skipif(not TPU_AVAILABLE, reason="test requires TPU machine") +@pl_multi_process_test +def test_model_tpu_cores_8(tmpdir): + """Make sure model trains on TPU.""" + trainer_options = dict( + default_root_dir=tmpdir, + progress_bar_refresh_rate=0, + max_epochs=1, + distributed_backend='tpu', + tpu_cores=8, + limit_train_batches=0.4, + limit_val_batches=0.4, + ) + + model = EvalModelTemplate() # 8 cores needs a big dataset - def long_train_loader(): - dataset = DataLoader(TrialMNIST(download=True, num_samples=15000, digits=(0, 1, 2, 5, 8)), batch_size=32) - return dataset - model.train_dataloader = long_train_loader - model.val_dataloader = long_train_loader + model.train_dataloader = _serial_train_loader + model.val_dataloader = _serial_train_loader tpipes.run_model_test(trainer_options, model, on_gpu=False, with_hpc=False) @pytest.mark.skipif(not TPU_AVAILABLE, reason="test requires TPU machine") -def test_base_tpu_16bit_model_core_1(tmpdir): +@pl_multi_process_test +def test_model_16bit_tpu_cores_1(tmpdir): """Make sure model trains on TPU.""" trainer_options = dict( default_root_dir=tmpdir, precision=16, progress_bar_refresh_rate=0, max_epochs=1, + distributed_backend='tpu', tpu_cores=1, limit_train_batches=0.4, - limit_val_batches=0.4 + limit_val_batches=0.4, ) model = EvalModelTemplate() @@ -97,98 +138,108 @@ def test_base_tpu_16bit_model_core_1(tmpdir): @pytest.mark.skipif(not TPU_AVAILABLE, reason="test requires TPU machine") -def test_base_tpu_16bit_model_idx_core(tmpdir): +@pl_multi_process_test +def test_model_16bit_tpu_index_1(tmpdir): """Make sure model trains on TPU.""" trainer_options = dict( default_root_dir=tmpdir, precision=16, progress_bar_refresh_rate=0, max_epochs=1, + distributed_backend='tpu', tpu_cores=[1], limit_train_batches=0.4, - limit_val_batches=0.4 + limit_val_batches=0.4, ) model = EvalModelTemplate() tpipes.run_model_test(trainer_options, model, on_gpu=False) + assert torch_xla._XLAC._xla_get_default_device() == 'xla:1' assert os.environ.get('XLA_USE_BF16') == str(1), "XLA_USE_BF16 was not set in environment variables" @pytest.mark.skipif(not TPU_AVAILABLE, reason="test requires TPU machine") -def test_base_tpu_16bit_model_8_cores(tmpdir): +@pl_multi_process_test +def test_model_16bit_tpu_cores_8(tmpdir): """Make sure model trains on TPU.""" trainer_options = dict( default_root_dir=tmpdir, precision=16, progress_bar_refresh_rate=0, max_epochs=1, + distributed_backend='tpu', tpu_cores=8, limit_train_batches=0.4, - limit_val_batches=0.4 + limit_val_batches=0.4, ) model = EvalModelTemplate() - # 8 cores needs a big dataset - def long_train_loader(): - dataset = DataLoader(TrialMNIST(download=True, num_samples=15000, digits=(0, 1, 2, 5, 8)), batch_size=32) - return dataset - model.train_dataloader = long_train_loader - model.val_dataloader = long_train_loader + model.train_dataloader = _serial_train_loader + model.val_dataloader = _serial_train_loader - tpipes.run_model_test(trainer_options, model, on_gpu=False) - assert os.environ.get('XLA_USE_BF16') == str(1), "XLA_USE_BF16 was not set in environment variables" + tpipes.run_model_test(trainer_options, model, on_gpu=False, with_hpc=False) @pytest.mark.skipif(not TPU_AVAILABLE, reason="test requires TPU machine") -def test_early_stop_checkpoints_on_tpu(tmpdir): - """Test if single TPU core training works""" +@pl_multi_process_test +def test_model_16bit_tpu_index_5(tmpdir): + """Test if distributed TPU core training works""" model = EvalModelTemplate() trainer = Trainer( - early_stop_callback=True, default_root_dir=tmpdir, - progress_bar_refresh_rate=0, - max_epochs=50, - limit_train_batches=10, - limit_val_batches=10, - tpu_cores=[8], + precision=16, + max_epochs=1, + train_percent_check=0.4, + val_percent_check=0.2, + distributed_backend='tpu', + tpu_cores=[5], ) trainer.fit(model) - assert torch_xla._XLAC._xla_get_default_device() == 'xla:8' + assert torch_xla._XLAC._xla_get_default_device() == 'xla:5' + assert os.environ.get('XLA_USE_BF16') == str(1), "XLA_USE_BF16 was not set in environment variables" @pytest.mark.skipif(not TPU_AVAILABLE, reason="test requires TPU machine") -def test_single_tpu_core_model(tmpdir): +@pl_multi_process_test +def test_early_stop_checkpoints_on_tpu(tmpdir): """Test if single TPU core training works""" model = EvalModelTemplate() trainer = Trainer( + early_stop_callback=True, default_root_dir=tmpdir, progress_bar_refresh_rate=0, - max_epochs=1, - train_percent_check=0.1, - val_percent_check=0.1, - tpu_cores=8, + max_epochs=50, + limit_train_batches=10, + limit_val_batches=10, + distributed_backend='tpu', + tpu_cores=[1], ) trainer.fit(model) - assert torch_xla._XLAC._xla_get_default_device() == 'xla:8' + assert torch_xla._XLAC._xla_get_default_device() == 'xla:1' @pytest.mark.skipif(not TPU_AVAILABLE, reason="test requires TPU machine") -def test_multi_core_tpu_model(tmpdir): - """Test if distributed TPU core training works""" +@pl_multi_process_test +def test_early_stop_checkpoints_on_tpu(tmpdir): + """Test if single TPU core training works""" model = EvalModelTemplate() trainer = Trainer( + early_stop_callback=True, default_root_dir=tmpdir, - max_epochs=1, - train_percent_check=0.4, - val_percent_check=0.2, - tpu_cores=[1, 8], + progress_bar_refresh_rate=0, + max_epochs=50, + limit_train_batches=10, + limit_val_batches=10, + distributed_backend='tpu', + tpu_cores=[5], ) trainer.fit(model) - assert trainer.tpu_id is None + assert torch_xla._XLAC._xla_get_default_device() == 'xla:5' @pytest.mark.skipif(not TPU_AVAILABLE, reason="test requires TPU machine") +@pl_multi_process_test def test_dataloaders_passed_to_fit(tmpdir): """Test if dataloaders passed to trainer works on TPU""" @@ -197,6 +248,7 @@ def test_dataloaders_passed_to_fit(tmpdir): trainer = Trainer( default_root_dir=tmpdir, max_epochs=1, + distributed_backend='tpu', tpu_cores=8, ) result = trainer.fit( @@ -218,7 +270,17 @@ def test_tpu_id_to_be_as_expected(tpu_cores, expected_tpu_id): assert Trainer(tpu_cores=tpu_cores).tpu_id == expected_tpu_id -@patch('pytorch_lightning.trainer.trainer.XLA_AVAILABLE', False) +def test_tpu_misconfiguration(): + """Test if trainer.tpu_id is set as expected""" + with pytest.raises(MisconfigurationException, match="`tpu_cores` can only be"): + Trainer( + tpu_cores=[1, 8], + distributed_backend='tpu', + ) + + +# @patch('pytorch_lightning.trainer.trainer.XLA_AVAILABLE', False) +@pytest.mark.skipif(TPU_AVAILABLE, reason="test requires missing TPU") def test_exception_when_no_tpu_found(tmpdir): """Test if exception is thrown when xla devices are not available""" model = EvalModelTemplate() @@ -227,8 +289,9 @@ def test_exception_when_no_tpu_found(tmpdir): max_epochs=1, train_percent_check=0.4, val_percent_check=0.2, + distributed_backend='tpu', tpu_cores=8, ) - with pytest.raises(MisconfigurationException, match='No TPU devices found.'): + with pytest.raises(MisconfigurationException, match='PyTorch XLA not installed.'): trainer.fit(model) diff --git a/tests/trainer/test_trainer.py b/tests/trainer/test_trainer.py index bfca63be60aa0..e11ed287f924d 100644 --- a/tests/trainer/test_trainer.py +++ b/tests/trainer/test_trainer.py @@ -814,79 +814,79 @@ def test_num_sanity_val_steps(tmpdir, limit_val_batches): @pytest.mark.parametrize("trainer_kwargs,expected", [ pytest.param( dict(distributed_backend=None, gpus=None), - dict(use_dp=False, use_ddp=False, use_ddp2=False, num_gpus=0, on_gpu=False, single_gpu=False, num_processes=1) + dict(use_dp=False, use_ddp=False, use_ddp2=False, num_gpus=0, on_gpu=False, use_single_gpu=False, num_processes=1) ), pytest.param( dict(distributed_backend="dp", gpus=None), - dict(use_dp=False, use_ddp=False, use_ddp2=False, num_gpus=0, on_gpu=False, single_gpu=False, num_processes=1) + dict(use_dp=False, use_ddp=False, use_ddp2=False, num_gpus=0, on_gpu=False, use_single_gpu=False, num_processes=1) ), pytest.param( dict(distributed_backend="dp", gpus=None), - dict(use_dp=False, use_ddp=False, use_ddp2=False, num_gpus=0, on_gpu=False, single_gpu=False, num_processes=1) + dict(use_dp=False, use_ddp=False, use_ddp2=False, num_gpus=0, on_gpu=False, use_single_gpu=False, num_processes=1) ), pytest.param( dict(distributed_backend="ddp", gpus=None), - dict(use_dp=False, use_ddp=False, use_ddp2=False, num_gpus=0, on_gpu=False, single_gpu=False, num_processes=1) + dict(use_dp=False, use_ddp=False, use_ddp2=False, num_gpus=0, on_gpu=False, use_single_gpu=False, num_processes=1) ), pytest.param( dict(distributed_backend="ddp", num_processes=2, gpus=None), - dict(use_dp=False, use_ddp=True, use_ddp2=False, num_gpus=0, on_gpu=False, single_gpu=False, num_processes=2) + dict(use_dp=False, use_ddp=True, use_ddp2=False, num_gpus=0, on_gpu=False, use_single_gpu=False, num_processes=2) ), pytest.param( dict(distributed_backend="ddp", num_nodes=2, gpus=None), - dict(use_dp=False, use_ddp=True, use_ddp2=False, num_gpus=0, on_gpu=False, single_gpu=False, num_processes=1) + dict(use_dp=False, use_ddp=True, use_ddp2=False, num_gpus=0, on_gpu=False, use_single_gpu=False, num_processes=1) ), pytest.param( dict(distributed_backend="ddp_cpu", num_processes=2, gpus=None), - dict(use_dp=False, use_ddp=True, use_ddp2=False, num_gpus=0, on_gpu=False, single_gpu=False, num_processes=2) + dict(use_dp=False, use_ddp=True, use_ddp2=False, num_gpus=0, on_gpu=False, use_single_gpu=False, num_processes=2) ), pytest.param( dict(distributed_backend="ddp2", gpus=None), - dict(use_dp=False, use_ddp=False, use_ddp2=False, num_gpus=0, on_gpu=False, single_gpu=False, num_processes=1) + dict(use_dp=False, use_ddp=False, use_ddp2=False, num_gpus=0, on_gpu=False, use_single_gpu=False, num_processes=1) ), pytest.param( dict(distributed_backend=None, gpus=1), - dict(use_dp=False, use_ddp=False, use_ddp2=False, num_gpus=1, on_gpu=True, single_gpu=True, num_processes=1), + dict(use_dp=False, use_ddp=False, use_ddp2=False, num_gpus=1, on_gpu=True, use_single_gpu=True, num_processes=1), marks=[pytest.mark.skipif(torch.cuda.device_count() == 0, reason="GPU needed")] ), pytest.param( dict(distributed_backend="dp", gpus=1), - dict(use_dp=True, use_ddp=False, use_ddp2=False, num_gpus=1, on_gpu=True, single_gpu=True, num_processes=1), + dict(use_dp=True, use_ddp=False, use_ddp2=False, num_gpus=1, on_gpu=True, use_single_gpu=True, num_processes=1), marks=[pytest.mark.skipif(torch.cuda.device_count() == 0, reason="GPU needed")] ), pytest.param( dict(distributed_backend="ddp", gpus=1), - dict(use_dp=False, use_ddp=True, use_ddp2=False, num_gpus=1, on_gpu=True, single_gpu=True, num_processes=1), + dict(use_dp=False, use_ddp=True, use_ddp2=False, num_gpus=1, on_gpu=True, use_single_gpu=True, num_processes=1), marks=[pytest.mark.skipif(torch.cuda.device_count() == 0, reason="GPU needed")] ), pytest.param( dict(distributed_backend="ddp_cpu", num_processes=2, gpus=1), - dict(use_dp=False, use_ddp=True, use_ddp2=False, num_gpus=0, on_gpu=False, single_gpu=False, num_processes=2), + dict(use_dp=False, use_ddp=True, use_ddp2=False, num_gpus=0, on_gpu=False, use_single_gpu=False, num_processes=2), marks=[pytest.mark.skipif(torch.cuda.device_count() == 0, reason="GPU needed")] ), pytest.param( dict(distributed_backend="ddp2", gpus=1), - dict(use_dp=False, use_ddp=False, use_ddp2=True, num_gpus=1, on_gpu=True, single_gpu=False, num_processes=1), + dict(use_dp=False, use_ddp=False, use_ddp2=True, num_gpus=1, on_gpu=True, use_single_gpu=False, num_processes=1), marks=[pytest.mark.skipif(torch.cuda.device_count() == 0, reason="GPU needed")] ), pytest.param( dict(distributed_backend=None, gpus=2), - dict(use_dp=False, use_ddp=True, use_ddp2=False, num_gpus=2, on_gpu=True, single_gpu=False, num_processes=2), + dict(use_dp=False, use_ddp=True, use_ddp2=False, num_gpus=2, on_gpu=True, use_single_gpu=False, num_processes=2), marks=[pytest.mark.skipif(torch.cuda.device_count() < 2, reason="Multiple GPUs needed")] ), pytest.param( dict(distributed_backend="dp", gpus=2), - dict(use_dp=True, use_ddp=False, use_ddp2=False, num_gpus=2, on_gpu=True, single_gpu=False, num_processes=1), + dict(use_dp=True, use_ddp=False, use_ddp2=False, num_gpus=2, on_gpu=True, use_single_gpu=False, num_processes=1), marks=[pytest.mark.skipif(torch.cuda.device_count() < 2, reason="Multiple GPUs needed")] ), pytest.param( dict(distributed_backend="ddp", gpus=2), - dict(use_dp=False, use_ddp=True, use_ddp2=False, num_gpus=2, on_gpu=True, single_gpu=False, num_processes=2), + dict(use_dp=False, use_ddp=True, use_ddp2=False, num_gpus=2, on_gpu=True, use_single_gpu=False, num_processes=2), marks=[pytest.mark.skipif(torch.cuda.device_count() < 2, reason="Multiple GPUs needed")] ), pytest.param( dict(distributed_backend="ddp2", gpus=2), - dict(use_dp=False, use_ddp=False, use_ddp2=True, num_gpus=2, on_gpu=True, single_gpu=False, num_processes=1), + dict(use_dp=False, use_ddp=False, use_ddp2=True, num_gpus=2, on_gpu=True, use_single_gpu=False, num_processes=1), marks=[pytest.mark.skipif(torch.cuda.device_count() < 2, reason="Multiple GPUs needed")] ), ]) @@ -897,7 +897,7 @@ def test_trainer_config(trainer_kwargs, expected): assert trainer.use_ddp2 is expected["use_ddp2"] assert trainer.num_gpus == expected["num_gpus"] assert trainer.on_gpu is expected["on_gpu"] - assert trainer.single_gpu is expected["single_gpu"] + assert trainer.use_single_gpu is expected["use_single_gpu"] assert trainer.num_processes == expected["num_processes"] diff --git a/tests/models/test_test_loop.py b/tests/trainer/test_trainer_test_loop.py similarity index 95% rename from tests/models/test_test_loop.py rename to tests/trainer/test_trainer_test_loop.py index 10d8d35800e81..e8151c2c5bd1d 100644 --- a/tests/models/test_test_loop.py +++ b/tests/trainer/test_trainer_test_loop.py @@ -1,9 +1,9 @@ -import os +import pytest +import torch + import pytorch_lightning as pl -from tests.base import EvalModelTemplate import tests.base.develop_utils as tutils -import torch -import pytest +from tests.base import EvalModelTemplate @pytest.mark.skipif(torch.cuda.device_count() < 2, reason="test requires multi-GPU machine") @@ -12,7 +12,7 @@ def test_single_gpu_test(tmpdir): model = EvalModelTemplate() trainer = pl.Trainer( - default_root_dir=os.getcwd(), + default_root_dir=tmpdir, max_epochs=2, limit_train_batches=10, limit_val_batches=10, @@ -43,7 +43,7 @@ def test_dp_test(tmpdir): model = EvalModelTemplate() trainer = pl.Trainer( - default_root_dir=os.getcwd(), + default_root_dir=tmpdir, max_epochs=2, limit_train_batches=10, limit_val_batches=10, @@ -72,7 +72,7 @@ def test_ddp_spawn_test(tmpdir): model = EvalModelTemplate() trainer = pl.Trainer( - default_root_dir=os.getcwd(), + default_root_dir=tmpdir, max_epochs=2, limit_train_batches=10, limit_val_batches=10,