From 9496c68f9486ea9e4e9bb59d53d8173ec35f9d8f Mon Sep 17 00:00:00 2001 From: Peter Schafhalter Date: Wed, 5 Jun 2019 10:19:13 +0200 Subject: [PATCH 1/7] Add non-distributed PyTorch runner --- .../sgd/pytorch/distributed_pytorch_runner.py | 129 ++++++++++++++++++ .../sgd/pytorch/pytorch_runner.py | 66 ++------- .../sgd/pytorch/pytorch_trainer.py | 33 +++-- python/ray/experimental/sgd/pytorch/utils.py | 5 + .../experimental/sgd/tests/test_pytorch.py | 20 +-- 5 files changed, 175 insertions(+), 78 deletions(-) create mode 100644 python/ray/experimental/sgd/pytorch/distributed_pytorch_runner.py diff --git a/python/ray/experimental/sgd/pytorch/distributed_pytorch_runner.py b/python/ray/experimental/sgd/pytorch/distributed_pytorch_runner.py new file mode 100644 index 000000000000..6bcfedaf9a75 --- /dev/null +++ b/python/ray/experimental/sgd/pytorch/distributed_pytorch_runner.py @@ -0,0 +1,129 @@ +import logging +import os +import torch.distributed as dist +import torch.utils.data + +from ray.experimental.sgd.pytorch.pytorch_runner import PyTorchRunner + +logger = logging.getLogger(__name__) + + +class DistributedPyTorchRunner(PyTorchRunner): + """Manages a distributed PyTorch model replica""" + + def __init__(self, + model_creator, + data_creator, + optimizer_creator, + config=None, + batch_size=16, + backend="gloo"): + """Initializes the runner. + + Args: + model_creator (dict -> torch.nn.Module): creates the model using + the config. + data_creator (dict -> Dataset, Dataset): creates the training and + validation data sets using the config. + optimizer_creator (torch.nn.Module, dict -> loss, optimizer): + creates the loss and optimizer using the model and the config. + config (dict): configuration passed to 'model_creator', + 'data_creator', and 'optimizer_creator'. + batch_size (int): batch size used in an update. + backend (string): backend used by distributed PyTorch. + """ + super(DistributedPyTorchRunner, self).__init__( + model_creator, data_creator, optimizer_creator, config, batch_size) + self.backend = backend + + def setup(self, url, world_rank, world_size): + """Connects to the distributed PyTorch backend and initializes the model. + + Args: + url (str): the URL used to connect to distributed PyTorch. + world_rank (int): the index of the runner. + world_size (int): the total number of runners. + """ + self._setup_distributed_pytorch(url, world_rank, world_size) + self._setup_training() + + def _setup_distributed_pytorch(self, url, world_rank, world_size): + os.environ["CUDA_LAUNCH_BLOCKING"] = "1" + with self._timers["setup_proc"]: + self.world_rank = world_rank + logger.debug( + "Connecting to {} world_rank: {} world_size: {}".format( + url, world_rank, world_size)) + logger.debug("using {}".format(self.backend)) + dist.init_process_group( + backend=self.backend, + init_method=url, + rank=world_rank, + world_size=world_size) + + def _setup_training(self): + logger.debug("Creating model") + self.model = self.model_creator(self.config) + if torch.cuda.is_available(): + self.model = torch.nn.parallel.DistributedDataParallel( + self.model.cuda()) + else: + self.model = torch.nn.parallel.DistributedDataParallelCPU( + self.model) + + logger.debug("Creating optimizer") + self.criterion, self.optimizer = self.optimizer_creator( + self.model, self.config) + if torch.cuda.is_available(): + self.criterion = self.criterion.cuda() + + logger.debug("Creating dataset") + self.training_set, self.validation_set = self.data_creator(self.config) + + # TODO: make num_workers configurable + self.train_sampler = torch.utils.data.distributed.DistributedSampler( + self.training_set) + self.train_loader = torch.utils.data.DataLoader( + self.training_set, + batch_size=self.batch_size, + shuffle=(self.train_sampler is None), + num_workers=2, + pin_memory=False, + sampler=self.train_sampler) + + self.validation_sampler = ( + torch.utils.data.distributed.DistributedSampler( + self.validation_set)) + self.validation_loader = torch.utils.data.DataLoader( + self.validation_set, + batch_size=self.batch_size, + shuffle=(self.validation_sampler is None), + num_workers=2, + pin_memory=False, + sampler=self.validation_sampler) + + def step(self): + """Runs a training epoch and updates the model parameters""" + logger.debug("Starting step") + self.train_sampler.set_epoch(self.epoch) + return super(DistributedPyTorchRunner, self).step() + + def get_state(self): + """Returns the state of the runner""" + return { + "epoch": self.epoch, + "model": self.model.module.state_dict(), + "optimizer": self.optimizer.state_dict(), + "stats": self.stats() + } + + def set_state(self, state): + """Sets the state of the model""" + # TODO: restore timer stats + self.model.module.load_state_dict(state["model"]) + self.optimizer.load_state_dict(state["optimizer"]) + self.epoch = state["stats"]["epoch"] + + def shutdown(self): + """Attempts to shut down the worker""" + dist.destroy_process_group() diff --git a/python/ray/experimental/sgd/pytorch/pytorch_runner.py b/python/ray/experimental/sgd/pytorch/pytorch_runner.py index 5fe4ba1009f9..bfa1fc54386f 100644 --- a/python/ray/experimental/sgd/pytorch/pytorch_runner.py +++ b/python/ray/experimental/sgd/pytorch/pytorch_runner.py @@ -3,9 +3,7 @@ from __future__ import print_function import logging -import os import torch -import torch.distributed as dist import torch.utils.data import ray @@ -15,15 +13,14 @@ class PyTorchRunner(object): - """Manages a distributed PyTorch model replica""" + """Manages a PyTorch model for training""" def __init__(self, model_creator, data_creator, optimizer_creator, config=None, - batch_size=16, - backend="gloo"): + batch_size=16): """Initializes the runner. Args: @@ -36,7 +33,6 @@ def __init__(self, config (dict): configuration passed to 'model_creator', 'data_creator', and 'optimizer_creator'. batch_size (int): batch size used in an update. - backend (string): backend used by distributed PyTorch. """ self.model_creator = model_creator @@ -44,7 +40,6 @@ def __init__(self, self.optimizer_creator = optimizer_creator self.config = {} if config is None else config self.batch_size = batch_size - self.backend = backend self.verbose = True self.epoch = 0 @@ -56,72 +51,34 @@ def __init__(self, ] } - def setup(self, url, world_rank, world_size): - """Connects to the distributed PyTorch backend and initializes the model. - - Args: - url (str): the URL used to connect to distributed PyTorch. - world_rank (int): the index of the runner. - world_size (int): the total number of runners. - """ - self._setup_distributed_pytorch(url, world_rank, world_size) - self._setup_training() - - def _setup_distributed_pytorch(self, url, world_rank, world_size): - os.environ["CUDA_LAUNCH_BLOCKING"] = "1" - with self._timers["setup_proc"]: - self.world_rank = world_rank - logger.debug( - "Connecting to {} world_rank: {} world_size: {}".format( - url, world_rank, world_size)) - logger.debug("using {}".format(self.backend)) - dist.init_process_group( - backend=self.backend, - init_method=url, - rank=world_rank, - world_size=world_size) - - def _setup_training(self): + def setup(self): + """Initializes the model""" logger.debug("Creating model") self.model = self.model_creator(self.config) if torch.cuda.is_available(): - self.model = torch.nn.parallel.DistributedDataParallel( - self.model.cuda()) - else: - self.model = torch.nn.parallel.DistributedDataParallelCPU( - self.model) + self.model = self.model.cuda() logger.debug("Creating optimizer") self.criterion, self.optimizer = self.optimizer_creator( self.model, self.config) - if torch.cuda.is_available(): self.criterion = self.criterion.cuda() logger.debug("Creating dataset") self.training_set, self.validation_set = self.data_creator(self.config) - - # TODO: make num_workers configurable - self.train_sampler = torch.utils.data.distributed.DistributedSampler( - self.training_set) self.train_loader = torch.utils.data.DataLoader( self.training_set, batch_size=self.batch_size, - shuffle=(self.train_sampler is None), + shuffle=True, num_workers=2, - pin_memory=False, - sampler=self.train_sampler) + pin_memory=False) - self.validation_sampler = ( - torch.utils.data.distributed.DistributedSampler( - self.validation_set)) self.validation_loader = torch.utils.data.DataLoader( self.validation_set, batch_size=self.batch_size, - shuffle=(self.validation_sampler is None), + shuffle=True, num_workers=2, - pin_memory=False, - sampler=self.validation_sampler) + pin_memory=False) def get_node_ip(self): """Returns the IP address of the current node""" @@ -129,9 +86,6 @@ def get_node_ip(self): def step(self): """Runs a training epoch and updates the model parameters""" - logger.debug("Starting step") - self.train_sampler.set_epoch(self.epoch) - logger.debug("Begin Training Epoch {}".format(self.epoch + 1)) with self._timers["training"]: train_stats = utils.train(self.train_loader, self.model, @@ -179,4 +133,4 @@ def set_state(self, state): def shutdown(self): """Attempts to shut down the worker""" - dist.destroy_process_group() + pass diff --git a/python/ray/experimental/sgd/pytorch/pytorch_trainer.py b/python/ray/experimental/sgd/pytorch/pytorch_trainer.py index 073ad3d34042..0aaf07d3ae0d 100644 --- a/python/ray/experimental/sgd/pytorch/pytorch_trainer.py +++ b/python/ray/experimental/sgd/pytorch/pytorch_trainer.py @@ -10,6 +10,8 @@ import ray from ray.experimental.sgd.pytorch.pytorch_runner import PyTorchRunner +from ray.experimental.sgd.pytorch.distributed_pytorch_runner import ( + DistributedPyTorchRunner) from ray.experimental.sgd.pytorch import utils logger = logging.getLogger(__name__) @@ -68,10 +70,17 @@ def __init__(self, if backend == "auto": backend = "nccl" if resources_per_replica.num_gpus > 0 else "gloo" - Runner = ray.remote( - num_cpus=resources_per_replica.num_cpus, - num_gpus=resources_per_replica.num_gpus, - resources=resources_per_replica.resources)(PyTorchRunner) + if num_replicas == 1: + Runner = ray.remote( + num_cpus=resources_per_replica.num_cpus, + num_gpus=resources_per_replica.num_gpus, + resources=resources_per_replica.resources)(PyTorchRunner) + else: + Runner = ray.remote( + num_cpus=resources_per_replica.num_cpus, + num_gpus=resources_per_replica.num_gpus, + resources=resources_per_replica.resources)( + DistributedPyTorchRunner) batch_size_per_replica = batch_size // num_replicas if batch_size % num_replicas > 0: @@ -85,6 +94,9 @@ def __init__(self, num_replicas=num_replicas)) self.workers = [ + Runner.remote(model_creator, data_creator, optimizer_creator, + self.config, batch_size_per_replica) + if num_replicas == 1 else Runner.remote(model_creator, data_creator, optimizer_creator, self.config, batch_size_per_replica, backend) for i in range(num_replicas) @@ -96,7 +108,9 @@ def __init__(self, # Get setup tasks in order to throw errors on failure ray.get([ - worker.setup.remote(address, i, len(self.workers)) + worker.setup.remote() + if num_replicas == 1 else worker.setup.remote( + address, i, len(self.workers)) for i, worker in enumerate(self.workers) ]) @@ -122,14 +136,7 @@ def get_model(self): """Returns the learned model""" model = self.model_creator(self.config) state = ray.get(self.workers[0].get_state.remote()) - - # Remove module. prefix added by distrbuted pytorch - state_dict = { - k.replace("module.", ""): v - for k, v in state["model"].items() - } - - model.load_state_dict(state_dict) + model.load_state_dict(state["model"]) return model def save(self, ckpt): diff --git a/python/ray/experimental/sgd/pytorch/utils.py b/python/ray/experimental/sgd/pytorch/utils.py index f7c6e4abac97..aede4ab70bac 100644 --- a/python/ray/experimental/sgd/pytorch/utils.py +++ b/python/ray/experimental/sgd/pytorch/utils.py @@ -238,3 +238,8 @@ def sgd_mse_optimizer(model, config): criterion = nn.MSELoss() optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate) return criterion, optimizer + + +def clean_state_dict(state_dict): + """Removes the 'module.' prefix added by PyTorch""" + return {k.replace("module.", ""): v for k, v in state_dict.items()} diff --git a/python/ray/experimental/sgd/tests/test_pytorch.py b/python/ray/experimental/sgd/tests/test_pytorch.py index faff23f8a809..3cf26a264c75 100644 --- a/python/ray/experimental/sgd/tests/test_pytorch.py +++ b/python/ray/experimental/sgd/tests/test_pytorch.py @@ -15,14 +15,15 @@ model_creator, optimizer_creator, data_creator) -@pytest.mark.skipif( # noqa: F811 - sys.platform == "darwin", reason="Doesn't work on macOS.") -def test_train(ray_start_2_cpus): # noqa: F811 +# Distributed PyTorch doesn't work with macOS, so test with only 1 replica +@pytest.mark.parametrize( # noqa: F811 + "num_replicas", [1] if sys.platform == "darwin" else [1, 2]) +def test_train(ray_start_2_cpus, num_replicas): # noqa: F811 trainer = PyTorchTrainer( model_creator, data_creator, optimizer_creator, - num_replicas=2, + num_replicas=num_replicas, resources_per_replica=Resources(num_cpus=1)) train_loss1 = trainer.train()["train_loss"] validation_loss1 = trainer.validate()["validation_loss"] @@ -37,14 +38,15 @@ def test_train(ray_start_2_cpus): # noqa: F811 assert validation_loss2 <= validation_loss1 -@pytest.mark.skipif( # noqa: F811 - sys.platform == "darwin", reason="Doesn't work on macOS.") -def test_save_and_restore(ray_start_2_cpus): # noqa: F811 +# Distributed PyTorch doesn't work with macOS, so test with only 1 replica +@pytest.mark.parametrize( # noqa: F811 + "num_replicas", [1] if sys.platform == "darwin" else [1, 2]) +def test_save_and_restore(ray_start_2_cpus, num_replicas): # noqa: F811 trainer1 = PyTorchTrainer( model_creator, data_creator, optimizer_creator, - num_replicas=2, + num_replicas=num_replicas, resources_per_replica=Resources(num_cpus=1)) trainer1.train() @@ -59,7 +61,7 @@ def test_save_and_restore(ray_start_2_cpus): # noqa: F811 model_creator, data_creator, optimizer_creator, - num_replicas=2, + num_replicas=num_replicas, resources_per_replica=Resources(num_cpus=1)) trainer2.restore(filename) From 0157932156ccb285648f5e94f99774204f0a8868 Mon Sep 17 00:00:00 2001 From: Peter Schafhalter Date: Wed, 5 Jun 2019 10:46:39 +0200 Subject: [PATCH 2/7] use dist.is_available() instead of checking OS --- python/ray/experimental/sgd/pytorch/pytorch_trainer.py | 4 ++-- python/ray/experimental/sgd/tests/test_pytorch.py | 8 +++----- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/python/ray/experimental/sgd/pytorch/pytorch_trainer.py b/python/ray/experimental/sgd/pytorch/pytorch_trainer.py index 0aaf07d3ae0d..b37468b27f84 100644 --- a/python/ray/experimental/sgd/pytorch/pytorch_trainer.py +++ b/python/ray/experimental/sgd/pytorch/pytorch_trainer.py @@ -3,8 +3,8 @@ from __future__ import print_function import numpy as np -import sys import torch +import torch.distributed as dist import logging import ray @@ -53,7 +53,7 @@ def __init__(self, """ # TODO: add support for mixed precision # TODO: add support for callbacks - if sys.platform == "darwin": + if num_replicas > 1 and not dist.is_available(): raise Exception( ("Distributed PyTorch is not supported on macOS. For more " "information, see " diff --git a/python/ray/experimental/sgd/tests/test_pytorch.py b/python/ray/experimental/sgd/tests/test_pytorch.py index 3cf26a264c75..aa0596aa158c 100644 --- a/python/ray/experimental/sgd/tests/test_pytorch.py +++ b/python/ray/experimental/sgd/tests/test_pytorch.py @@ -4,9 +4,9 @@ import os import pytest -import sys import tempfile import torch +import torch.distributed as dist from ray.tests.conftest import ray_start_2_cpus # noqa: F401 from ray.experimental.sgd.pytorch import PyTorchTrainer, Resources @@ -15,9 +15,8 @@ model_creator, optimizer_creator, data_creator) -# Distributed PyTorch doesn't work with macOS, so test with only 1 replica @pytest.mark.parametrize( # noqa: F811 - "num_replicas", [1] if sys.platform == "darwin" else [1, 2]) + "num_replicas", [1, 2] if dist.is_available() else [1]) def test_train(ray_start_2_cpus, num_replicas): # noqa: F811 trainer = PyTorchTrainer( model_creator, @@ -38,9 +37,8 @@ def test_train(ray_start_2_cpus, num_replicas): # noqa: F811 assert validation_loss2 <= validation_loss1 -# Distributed PyTorch doesn't work with macOS, so test with only 1 replica @pytest.mark.parametrize( # noqa: F811 - "num_replicas", [1] if sys.platform == "darwin" else [1, 2]) + "num_replicas", [1, 2] if dist.is_available() else [1]) def test_save_and_restore(ray_start_2_cpus, num_replicas): # noqa: F811 trainer1 = PyTorchTrainer( model_creator, From 0903c77cfa715a23608e273780f7b744ea4ec87a Mon Sep 17 00:00:00 2001 From: Peter Schafhalter Date: Wed, 5 Jun 2019 11:18:18 +0200 Subject: [PATCH 3/7] Nicer exception --- python/ray/experimental/sgd/pytorch/pytorch_trainer.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/python/ray/experimental/sgd/pytorch/pytorch_trainer.py b/python/ray/experimental/sgd/pytorch/pytorch_trainer.py index b37468b27f84..0055229a985e 100644 --- a/python/ray/experimental/sgd/pytorch/pytorch_trainer.py +++ b/python/ray/experimental/sgd/pytorch/pytorch_trainer.py @@ -55,8 +55,9 @@ def __init__(self, # TODO: add support for callbacks if num_replicas > 1 and not dist.is_available(): raise Exception( - ("Distributed PyTorch is not supported on macOS. For more " - "information, see " + ("Distributed PyTorch is not supported on macOS. " + "To run without distributed PyTorch, set 'num_replicas=1'. " + "For more information, see " "https://github.com/pytorch/examples/issues/467.")) self.model_creator = model_creator From 5def33658d5359bb4ba7dea6331b22c553bbdce8 Mon Sep 17 00:00:00 2001 From: Peter Schafhalter Date: Thu, 6 Jun 2019 09:48:13 +0200 Subject: [PATCH 4/7] Fix bug in choosing port --- python/ray/experimental/sgd/pytorch/pytorch_runner.py | 4 ++++ python/ray/experimental/sgd/pytorch/pytorch_trainer.py | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/python/ray/experimental/sgd/pytorch/pytorch_runner.py b/python/ray/experimental/sgd/pytorch/pytorch_runner.py index bfa1fc54386f..8527ca21aa34 100644 --- a/python/ray/experimental/sgd/pytorch/pytorch_runner.py +++ b/python/ray/experimental/sgd/pytorch/pytorch_runner.py @@ -84,6 +84,10 @@ def get_node_ip(self): """Returns the IP address of the current node""" return ray.services.get_node_ip_address() + def find_free_port(self): + """Finds a fee port on the curent node""" + return utils.find_free_port() + def step(self): """Runs a training epoch and updates the model parameters""" logger.debug("Begin Training Epoch {}".format(self.epoch + 1)) diff --git a/python/ray/experimental/sgd/pytorch/pytorch_trainer.py b/python/ray/experimental/sgd/pytorch/pytorch_trainer.py index 0055229a985e..f866bb6c261a 100644 --- a/python/ray/experimental/sgd/pytorch/pytorch_trainer.py +++ b/python/ray/experimental/sgd/pytorch/pytorch_trainer.py @@ -104,7 +104,7 @@ def __init__(self, ] ip = ray.get(self.workers[0].get_node_ip.remote()) - port = utils.find_free_port() + port = ray.get(self.workers[0].find_free_port.remote()) address = "tcp://{ip}:{port}".format(ip=ip, port=port) # Get setup tasks in order to throw errors on failure From e250fd64de185ea5f28e7c619453ce3f2e547570 Mon Sep 17 00:00:00 2001 From: Peter Schafhalter Date: Thu, 6 Jun 2019 10:05:38 +0200 Subject: [PATCH 5/7] Refactor some code --- .../sgd/pytorch/pytorch_trainer.py | 67 ++++++++++--------- 1 file changed, 35 insertions(+), 32 deletions(-) diff --git a/python/ray/experimental/sgd/pytorch/pytorch_trainer.py b/python/ray/experimental/sgd/pytorch/pytorch_trainer.py index f866bb6c261a..dc555a19853b 100644 --- a/python/ray/experimental/sgd/pytorch/pytorch_trainer.py +++ b/python/ray/experimental/sgd/pytorch/pytorch_trainer.py @@ -72,48 +72,51 @@ def __init__(self, backend = "nccl" if resources_per_replica.num_gpus > 0 else "gloo" if num_replicas == 1: + # Generate actor class Runner = ray.remote( num_cpus=resources_per_replica.num_cpus, num_gpus=resources_per_replica.num_gpus, resources=resources_per_replica.resources)(PyTorchRunner) + # Start workers + self.workers = [ + Runner.remote(model_creator, data_creator, optimizer_creator, + self.config, batch_size) + ] + # Get setup tasks in order to throw errors on failure + ray.get(self.workers[0].setup.remote()) else: + # Geneate actor class Runner = ray.remote( num_cpus=resources_per_replica.num_cpus, num_gpus=resources_per_replica.num_gpus, resources=resources_per_replica.resources)( DistributedPyTorchRunner) - - batch_size_per_replica = batch_size // num_replicas - if batch_size % num_replicas > 0: - new_batch_size = batch_size_per_replica * num_replicas - logger.warn( - ("Changing batch size from {old_batch_size} to " - "{new_batch_size} to evenly distribute batches across " - "{num_replicas} replicas.").format( - old_batch_size=batch_size, - new_batch_size=new_batch_size, - num_replicas=num_replicas)) - - self.workers = [ - Runner.remote(model_creator, data_creator, optimizer_creator, - self.config, batch_size_per_replica) - if num_replicas == 1 else - Runner.remote(model_creator, data_creator, optimizer_creator, - self.config, batch_size_per_replica, backend) - for i in range(num_replicas) - ] - - ip = ray.get(self.workers[0].get_node_ip.remote()) - port = ray.get(self.workers[0].find_free_port.remote()) - address = "tcp://{ip}:{port}".format(ip=ip, port=port) - - # Get setup tasks in order to throw errors on failure - ray.get([ - worker.setup.remote() - if num_replicas == 1 else worker.setup.remote( - address, i, len(self.workers)) - for i, worker in enumerate(self.workers) - ]) + # Compute batch size per replica + batch_size_per_replica = batch_size // num_replicas + if batch_size % num_replicas > 0: + new_batch_size = batch_size_per_replica * num_replicas + logger.warn( + ("Changing batch size from {old_batch_size} to " + "{new_batch_size} to evenly distribute batches across " + "{num_replicas} replicas.").format( + old_batch_size=batch_size, + new_batch_size=new_batch_size, + num_replicas=num_replicas)) + # Start workers + self.workers = [ + Runner.remote(model_creator, data_creator, optimizer_creator, + self.config, batch_size_per_replica, backend) + for i in range(num_replicas) + ] + # Compute URL for initializing distributed PyTorch + ip = ray.get(self.workers[0].get_node_ip.remote()) + port = ray.get(self.workers[0].find_free_port.remote()) + address = "tcp://{ip}:{port}".format(ip=ip, port=port) + # Get setup tasks in order to throw errors on failure + ray.get([ + worker.setup.remote(address, i, len(self.workers)) + for i, worker in enumerate(self.workers) + ]) def train(self): """Runs a training epoch""" From 60542695b96ff904fd098bbc41bd7c08aed426a8 Mon Sep 17 00:00:00 2001 From: Peter Schafhalter Date: Fri, 7 Jun 2019 09:22:01 +0200 Subject: [PATCH 6/7] Address comments --- .../sgd/pytorch/distributed_pytorch_runner.py | 11 +++---- .../sgd/pytorch/pytorch_runner.py | 30 ++++++++++++------- .../sgd/pytorch/pytorch_trainer.py | 14 ++++----- python/ray/experimental/sgd/pytorch/utils.py | 7 +---- 4 files changed, 33 insertions(+), 29 deletions(-) diff --git a/python/ray/experimental/sgd/pytorch/distributed_pytorch_runner.py b/python/ray/experimental/sgd/pytorch/distributed_pytorch_runner.py index 6bcfedaf9a75..788dd0f41e3b 100644 --- a/python/ray/experimental/sgd/pytorch/distributed_pytorch_runner.py +++ b/python/ray/experimental/sgd/pytorch/distributed_pytorch_runner.py @@ -9,7 +9,7 @@ class DistributedPyTorchRunner(PyTorchRunner): - """Manages a distributed PyTorch model replica""" + """Manages a distributed PyTorch model replica.""" def __init__(self, model_creator, @@ -103,13 +103,13 @@ def _setup_training(self): sampler=self.validation_sampler) def step(self): - """Runs a training epoch and updates the model parameters""" + """Runs a training epoch and updates the model parameters.""" logger.debug("Starting step") self.train_sampler.set_epoch(self.epoch) return super(DistributedPyTorchRunner, self).step() def get_state(self): - """Returns the state of the runner""" + """Returns the state of the runner.""" return { "epoch": self.epoch, "model": self.model.module.state_dict(), @@ -118,12 +118,13 @@ def get_state(self): } def set_state(self, state): - """Sets the state of the model""" + """Sets the state of the model.""" # TODO: restore timer stats self.model.module.load_state_dict(state["model"]) self.optimizer.load_state_dict(state["optimizer"]) self.epoch = state["stats"]["epoch"] def shutdown(self): - """Attempts to shut down the worker""" + """Attempts to shut down the worker.""" + super(DistributedPyTorchRunner, self).shutdown() dist.destroy_process_group() diff --git a/python/ray/experimental/sgd/pytorch/pytorch_runner.py b/python/ray/experimental/sgd/pytorch/pytorch_runner.py index 8527ca21aa34..581bcdfa747f 100644 --- a/python/ray/experimental/sgd/pytorch/pytorch_runner.py +++ b/python/ray/experimental/sgd/pytorch/pytorch_runner.py @@ -13,7 +13,7 @@ class PyTorchRunner(object): - """Manages a PyTorch model for training""" + """Manages a PyTorch model for training.""" def __init__(self, model_creator, @@ -52,7 +52,7 @@ def __init__(self, } def setup(self): - """Initializes the model""" + """Initializes the model.""" logger.debug("Creating model") self.model = self.model_creator(self.config) if torch.cuda.is_available(): @@ -81,15 +81,15 @@ def setup(self): pin_memory=False) def get_node_ip(self): - """Returns the IP address of the current node""" + """Returns the IP address of the current node.""" return ray.services.get_node_ip_address() def find_free_port(self): - """Finds a fee port on the curent node""" + """Finds a free port on the current node.""" return utils.find_free_port() def step(self): - """Runs a training epoch and updates the model parameters""" + """Runs a training epoch and updates the model parameters.""" logger.debug("Begin Training Epoch {}".format(self.epoch + 1)) with self._timers["training"]: train_stats = utils.train(self.train_loader, self.model, @@ -102,7 +102,7 @@ def step(self): return train_stats def validate(self): - """Evaluates the model on the validation data set""" + """Evaluates the model on the validation data set.""" with self._timers["validation"]: validation_stats = utils.validate(self.validation_loader, self.model, self.criterion) @@ -111,7 +111,7 @@ def validate(self): return validation_stats def stats(self): - """Returns a dictionary of statistics collected""" + """Returns a dictionary of statistics collected.""" stats = {"epoch": self.epoch} for k, t in self._timers.items(): stats[k + "_time_mean"] = t.mean @@ -120,7 +120,7 @@ def stats(self): return stats def get_state(self): - """Returns the state of the runner""" + """Returns the state of the runner.""" return { "epoch": self.epoch, "model": self.model.state_dict(), @@ -129,12 +129,20 @@ def get_state(self): } def set_state(self, state): - """Sets the state of the model""" + """Sets the state of the model.""" # TODO: restore timer stats self.model.load_state_dict(state["model"]) self.optimizer.load_state_dict(state["optimizer"]) self.epoch = state["stats"]["epoch"] def shutdown(self): - """Attempts to shut down the worker""" - pass + """Attempts to shut down the worker.""" + del self.validation_loader + del self.validation_set + del self.train_loader + del self.training_set + del self.criterion + del self.optimizer + del self.model + if torch.cuda.is_available(): + torch.cuda.empty_cache() diff --git a/python/ray/experimental/sgd/pytorch/pytorch_trainer.py b/python/ray/experimental/sgd/pytorch/pytorch_trainer.py index dc555a19853b..0e0c5d8436a1 100644 --- a/python/ray/experimental/sgd/pytorch/pytorch_trainer.py +++ b/python/ray/experimental/sgd/pytorch/pytorch_trainer.py @@ -54,7 +54,7 @@ def __init__(self, # TODO: add support for mixed precision # TODO: add support for callbacks if num_replicas > 1 and not dist.is_available(): - raise Exception( + raise ValueError( ("Distributed PyTorch is not supported on macOS. " "To run without distributed PyTorch, set 'num_replicas=1'. " "For more information, see " @@ -119,7 +119,7 @@ def __init__(self, ]) def train(self): - """Runs a training epoch""" + """Runs a training epoch.""" with self.optimizer_timer: worker_stats = ray.get([w.step.remote() for w in self.workers]) @@ -129,7 +129,7 @@ def train(self): return train_stats def validate(self): - """Evaluates the model on the validation data set""" + """Evaluates the model on the validation data set.""" worker_stats = ray.get([w.validate.remote() for w in self.workers]) validation_stats = worker_stats[0].copy() validation_stats["validation_loss"] = np.mean( @@ -137,25 +137,25 @@ def validate(self): return validation_stats def get_model(self): - """Returns the learned model""" + """Returns the learned model.""" model = self.model_creator(self.config) state = ray.get(self.workers[0].get_state.remote()) model.load_state_dict(state["model"]) return model def save(self, ckpt): - """Saves the model at the provided checkpoint""" + """Saves the model at the provided checkpoint.""" state = ray.get(self.workers[0].get_state.remote()) torch.save(state, ckpt) def restore(self, ckpt): - """Restores the model from the provided checkpoint""" + """Restores the model from the provided checkpoint.""" state = torch.load(ckpt) state_id = ray.put(state) ray.get([worker.set_state.remote(state_id) for worker in self.workers]) def shutdown(self): - """Shuts down workers and releases resources""" + """Shuts down workers and releases resources.""" for worker in self.workers: worker.shutdown.remote() worker.__ray_terminate__.remote() diff --git a/python/ray/experimental/sgd/pytorch/utils.py b/python/ray/experimental/sgd/pytorch/utils.py index aede4ab70bac..5be26b331cfd 100644 --- a/python/ray/experimental/sgd/pytorch/utils.py +++ b/python/ray/experimental/sgd/pytorch/utils.py @@ -196,7 +196,7 @@ def find_free_port(): class AverageMeter(object): - """Computes and stores the average and current value""" + """Computes and stores the average and current value.""" def __init__(self): self.reset() @@ -238,8 +238,3 @@ def sgd_mse_optimizer(model, config): criterion = nn.MSELoss() optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate) return criterion, optimizer - - -def clean_state_dict(state_dict): - """Removes the 'module.' prefix added by PyTorch""" - return {k.replace("module.", ""): v for k, v in state_dict.items()} From bf2bfc898b4dc4ad0b11301ff2c6aa7dc5d44a47 Mon Sep 17 00:00:00 2001 From: Peter Schafhalter Date: Mon, 10 Jun 2019 12:59:14 +0200 Subject: [PATCH 7/7] Address comments --- .../sgd/pytorch/distributed_pytorch_runner.py | 19 ++++++++++--------- .../sgd/pytorch/pytorch_runner.py | 13 +++++-------- 2 files changed, 15 insertions(+), 17 deletions(-) diff --git a/python/ray/experimental/sgd/pytorch/distributed_pytorch_runner.py b/python/ray/experimental/sgd/pytorch/distributed_pytorch_runner.py index 788dd0f41e3b..160544633353 100644 --- a/python/ray/experimental/sgd/pytorch/distributed_pytorch_runner.py +++ b/python/ray/experimental/sgd/pytorch/distributed_pytorch_runner.py @@ -1,3 +1,7 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + import logging import os import torch.distributed as dist @@ -21,16 +25,13 @@ def __init__(self, """Initializes the runner. Args: - model_creator (dict -> torch.nn.Module): creates the model using - the config. - data_creator (dict -> Dataset, Dataset): creates the training and - validation data sets using the config. + model_creator (dict -> torch.nn.Module): see pytorch_trainer.py. + data_creator (dict -> Dataset, Dataset): see pytorch_trainer.py. optimizer_creator (torch.nn.Module, dict -> loss, optimizer): - creates the loss and optimizer using the model and the config. - config (dict): configuration passed to 'model_creator', - 'data_creator', and 'optimizer_creator'. - batch_size (int): batch size used in an update. - backend (string): backend used by distributed PyTorch. + see pytorch_trainer.py. + config (dict): see pytorch_trainer.py. + batch_size (int): batch size used by one replica for an update. + backend (string): see pytorch_trainer.py. """ super(DistributedPyTorchRunner, self).__init__( model_creator, data_creator, optimizer_creator, config, batch_size) diff --git a/python/ray/experimental/sgd/pytorch/pytorch_runner.py b/python/ray/experimental/sgd/pytorch/pytorch_runner.py index 581bcdfa747f..1663b2c64f0e 100644 --- a/python/ray/experimental/sgd/pytorch/pytorch_runner.py +++ b/python/ray/experimental/sgd/pytorch/pytorch_runner.py @@ -24,15 +24,12 @@ def __init__(self, """Initializes the runner. Args: - model_creator (dict -> torch.nn.Module): creates the model using - the config. - data_creator (dict -> Dataset, Dataset): creates the training and - validation data sets using the config. + model_creator (dict -> torch.nn.Module): see pytorch_trainer.py. + data_creator (dict -> Dataset, Dataset): see pytorch_trainer.py. optimizer_creator (torch.nn.Module, dict -> loss, optimizer): - creates the loss and optimizer using the model and the config. - config (dict): configuration passed to 'model_creator', - 'data_creator', and 'optimizer_creator'. - batch_size (int): batch size used in an update. + see pytorch_trainer.py. + config (dict): see pytorch_trainer.py. + batch_size (int): see pytorch_trainer.py. """ self.model_creator = model_creator