diff --git a/python/ray/experimental/sgd/examples/example-sgd.yaml b/python/ray/experimental/sgd/examples/example-sgd.yaml index 1c33f888c16b..50766b0c48ab 100644 --- a/python/ray/experimental/sgd/examples/example-sgd.yaml +++ b/python/ray/experimental/sgd/examples/example-sgd.yaml @@ -4,8 +4,8 @@ cluster_name: sgd-pytorch # The maximum number of workers nodes to launch in addition to the head # node. This takes precedence over min_workers. min_workers default to 0. min_workers: 1 -initial_workers: 1 -max_workers: 1 +initial_workers: 7 +max_workers: 7 target_utilization_fraction: 0.9 @@ -27,7 +27,7 @@ auth: head_node: InstanceType: p3.8xlarge - ImageId: ami-0757fc5a639fe7666 + ImageId: ami-0d96d570269578cd7 # InstanceMarketOptions: # MarketType: spot # SpotOptions: @@ -36,7 +36,7 @@ head_node: worker_nodes: InstanceType: p3.8xlarge - ImageId: ami-0757fc5a639fe7666 + ImageId: ami-0d96d570269578cd7 # InstanceMarketOptions: # MarketType: spot # SpotOptions: @@ -47,12 +47,19 @@ worker_nodes: # MarketType: spot setup_commands: - - ray || pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.8.0.dev2-cp36-cp36m-manylinux1_x86_64.whl - - conda install -y pytorch torchvision cudatoolkit=9.0 -c pytorch - - pip install -U ipdb ray[rllib] + # Due to: https://github.com/pytorch/pytorch/issues/23534, torch 1.1 will not work due to NCCL errors. + - conda list -f torch | grep 1.2 || conda install -y pytorch-nightly=1.2.0.dev20190804 cudatoolkit=10.0 -c pytorch + # Since we are installing nightly torch 1.2, we need to build torchvision from source. + # --no-deps is to avoid reverting the above pytorch installation. + # torchvision must be installed after torch to maintain the correct linking of binaries + - pip list | grep torchvision || pip install git+https://github.com/pytorch/vision.git@6a834e983dbb8f6f233f40a301a64c4e1d49738c --no-deps + - ray || pip install -U ray + - pip install filelock ipdb ray[rllib] + - cd python_ray && python setup-dev.py --yes file_mounts: { + ~/python_ray/: /Users/rliaw/Research/riselab/ray/python/ray/ } # Custom commands that will be run on the head node after common setup. diff --git a/python/ray/experimental/sgd/examples/train_example.py b/python/ray/experimental/sgd/examples/train_example.py index 13bd2e9e5741..9f0d2b410fcf 100644 --- a/python/ray/experimental/sgd/examples/train_example.py +++ b/python/ray/experimental/sgd/examples/train_example.py @@ -2,26 +2,85 @@ from __future__ import division from __future__ import print_function +import os +import torch import argparse -from ray.experimental.sgd.pytorch import PyTorchTrainer +from ray import tune +from ray.experimental.sgd.pytorch.pytorch_trainer import (PyTorchTrainer, + PyTorchTrainable) +import ray +from ray.experimental.sgd.pytorch import PyTorchTrainer from ray.experimental.sgd.tests.pytorch_utils import ( - model_creator, optimizer_creator, data_creator) + resnet_creator, xe_optimizer_creator, cifar_creator) + + +def initialization_hook(runner): + print("NCCL DEBUG SET") + # Need this for avoiding a connection restart issue + os.environ["NCCL_SOCKET_IFNAME"] = "^docker0,lo" + os.environ["NCCL_LL_THRESHOLD"] = "0" + os.environ["NCCL_DEBUG"] = "INFO" + + +def train(model, train_iterator, criterion, optimizer): + model.train() + train_loss, total_num, correct = 0, 0, 0 + for batch_idx, (data, target) in enumerate(train_iterator): + # get small model update + if torch.cuda.is_available(): + data, target = data.cuda(), target.cuda() + output = model(data) + loss = criterion(output, target) # / float(large_ratio) + loss.backward() + train_loss += loss.item() * target.size(0) # * float(large_ratio) + total_num += target.size(0) + _, predicted = output.max(1) + correct += predicted.eq(target).sum().item() + optimizer.step() + optimizer.zero_grad() + stats = {"train_loss": train_loss / total_num} + return stats def train_example(num_replicas=1, use_gpu=False): trainer1 = PyTorchTrainer( - model_creator, - data_creator, - optimizer_creator, + resnet_creator, + cifar_creator, + xe_optimizer_creator, + train_function=train, num_replicas=num_replicas, use_gpu=use_gpu, - backend="gloo") + batch_size=2048, + backend="nccl") + stats = trainer1.train() + print(stats) trainer1.train() trainer1.shutdown() print("success!") +def tune_example(num_replicas=1, use_gpu=False): + + # analysis = tune.run(PyTorchTrainable, num_samples=1, config=config) + analysis = tune.run( + PyTorchTrainer.make_trainable( + resnet_creator, + cifar_creator, + xe_optimizer_creator, + nn.MSELoss, + train_function=train_function, + ), + num_samples=12, + config={ + "num_replicas": num_replicas, + "use_gpu": use_gpu + }, + stop={"training_iteration": 10}, + verbose=1) + return analysis.get_best_config(metric="validation_loss", mode="min") + + if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument( @@ -42,7 +101,5 @@ def train_example(num_replicas=1, use_gpu=False): help="Enables GPU training") args, _ = parser.parse_known_args() - import ray - ray.init(redis_address=args.redis_address) - train_example(num_replicas=args.num_replicas, use_gpu=args.use_gpu) + tune_example(num_replicas=args.num_replicas, use_gpu=args.use_gpu) diff --git a/python/ray/experimental/sgd/pytorch/distributed_pytorch_runner.py b/python/ray/experimental/sgd/pytorch/distributed_pytorch_runner.py index 160544633353..9899d0b735a2 100644 --- a/python/ray/experimental/sgd/pytorch/distributed_pytorch_runner.py +++ b/python/ray/experimental/sgd/pytorch/distributed_pytorch_runner.py @@ -19,6 +19,10 @@ def __init__(self, model_creator, data_creator, optimizer_creator, + loss_creator, + train_function=None, + validation_function=None, + initialization_hook=None, config=None, batch_size=16, backend="gloo"): @@ -34,7 +38,15 @@ def __init__(self, backend (string): see pytorch_trainer.py. """ super(DistributedPyTorchRunner, self).__init__( - model_creator, data_creator, optimizer_creator, config, batch_size) + model_creator, + data_creator, + optimizer_creator, + loss_creator, + train_function=train_function, + validation_function=validation_function, + initialization_hook=initialization_hook, + config=config, + batch_size=batch_size) self.backend = backend def setup(self, url, world_rank, world_size): @@ -49,7 +61,6 @@ def setup(self, url, world_rank, world_size): self._setup_training() def _setup_distributed_pytorch(self, url, world_rank, world_size): - os.environ["CUDA_LAUNCH_BLOCKING"] = "1" with self._timers["setup_proc"]: self.world_rank = world_rank logger.debug( @@ -72,9 +83,9 @@ def _setup_training(self): self.model = torch.nn.parallel.DistributedDataParallelCPU( self.model) - logger.debug("Creating optimizer") - self.criterion, self.optimizer = self.optimizer_creator( - self.model, self.config) + logger.debug("Creating optimizer.") + self.optimizer = self.optimizer_creator(self.model, self.config) + self.criterion = self.loss_creator(**self.config["loss_kwargs"]) if torch.cuda.is_available(): self.criterion = self.criterion.cuda() diff --git a/python/ray/experimental/sgd/pytorch/pytorch_runner.py b/python/ray/experimental/sgd/pytorch/pytorch_runner.py index 1663b2c64f0e..0ebd43beedf7 100644 --- a/python/ray/experimental/sgd/pytorch/pytorch_runner.py +++ b/python/ray/experimental/sgd/pytorch/pytorch_runner.py @@ -19,6 +19,10 @@ def __init__(self, model_creator, data_creator, optimizer_creator, + loss_creator, + train_function=None, + validation_function=None, + initialization_hook=None, config=None, batch_size=16): """Initializes the runner. @@ -31,11 +35,15 @@ def __init__(self, config (dict): see pytorch_trainer.py. batch_size (int): see pytorch_trainer.py. """ - + if initialization_hook: + initialization_hook(self) self.model_creator = model_creator self.data_creator = data_creator self.optimizer_creator = optimizer_creator + self.loss_creator = loss_creator self.config = {} if config is None else config + self.train_function = train_function or utils.train + self.validation_function = validation_function or utils.validate self.batch_size = batch_size self.verbose = True @@ -56,8 +64,8 @@ def setup(self): self.model = self.model.cuda() logger.debug("Creating optimizer") - self.criterion, self.optimizer = self.optimizer_creator( - self.model, self.config) + self.optimizer = self.optimizer_creator(self.model, self.config) + self.criterion = self.loss_creator(**self.config["loss_kwargs"]) if torch.cuda.is_available(): self.criterion = self.criterion.cuda() @@ -89,8 +97,8 @@ def step(self): """Runs a training epoch and updates the model parameters.""" logger.debug("Begin Training Epoch {}".format(self.epoch + 1)) with self._timers["training"]: - train_stats = utils.train(self.train_loader, self.model, - self.criterion, self.optimizer) + train_stats = self.train_function(self.model, self.train_loader, + self.criterion, self.optimizer) train_stats["epoch"] = self.epoch self.epoch += 1 @@ -101,8 +109,8 @@ def step(self): def validate(self): """Evaluates the model on the validation data set.""" with self._timers["validation"]: - validation_stats = utils.validate(self.validation_loader, - self.model, self.criterion) + validation_stats = self.validation_function( + self.model, self.validation_loader, self.criterion) validation_stats.update(self.stats()) return validation_stats diff --git a/python/ray/experimental/sgd/pytorch/pytorch_trainer.py b/python/ray/experimental/sgd/pytorch/pytorch_trainer.py index 95345461956d..ddebb33ac971 100644 --- a/python/ray/experimental/sgd/pytorch/pytorch_trainer.py +++ b/python/ray/experimental/sgd/pytorch/pytorch_trainer.py @@ -3,12 +3,15 @@ from __future__ import print_function import numpy as np +import os import torch import torch.distributed as dist import logging import ray +from ray.tune import Trainable +from ray.tune.resources import Resources from ray.experimental.sgd.pytorch.pytorch_runner import PyTorchRunner from ray.experimental.sgd.pytorch.distributed_pytorch_runner import ( DistributedPyTorchRunner) @@ -28,6 +31,8 @@ def __init__(self, model_creator, data_creator, optimizer_creator=utils.sgd_mse_optimizer, + train_function=None, + validation_function=None, config=None, num_replicas=1, use_gpu=False, @@ -75,8 +80,14 @@ def __init__(self, num_cpus=1, num_gpus=int(use_gpu))(PyTorchRunner) # Start workers self.workers = [ - Runner.remote(model_creator, data_creator, optimizer_creator, - self.config, batch_size) + Runner.remote( + model_creator, + data_creator, + optimizer_creator, + train_function=train_function, + validation_function=validation_function, + config=self.config, + batch_size=batch_size) ] # Get setup tasks in order to throw errors on failure ray.get(self.workers[0].setup.remote()) @@ -97,8 +108,15 @@ def __init__(self, num_replicas=num_replicas)) # Start workers self.workers = [ - Runner.remote(model_creator, data_creator, optimizer_creator, - self.config, batch_size_per_replica, backend) + Runner.remote( + model_creator, + data_creator, + optimizer_creator, + backend=backend, + train_function=train_function, + validation_function=validation_function, + config=self.config, + batch_size=batch_size_per_replica) for i in range(num_replicas) ] # Compute URL for initializing distributed PyTorch @@ -136,14 +154,25 @@ def get_model(self): model.load_state_dict(state["model"]) return model - def save(self, ckpt): - """Saves the model at the provided checkpoint.""" + def save(self, checkpoint): + """Saves the model at the provided checkpoint. + + Args: + checkpoint (str): Path to target checkpoint file. + + """ state = ray.get(self.workers[0].get_state.remote()) - torch.save(state, ckpt) + torch.save(state, checkpoint) + return checkpoint + + def restore(self, checkpoint): + """Restores the model from the provided checkpoint. - def restore(self, ckpt): - """Restores the model from the provided checkpoint.""" - state = torch.load(ckpt) + Args: + checkpoint (str): Path to target checkpoint file. + + """ + state = torch.load(checkpoint) state_id = ray.put(state) ray.get([worker.set_state.remote(state_id) for worker in self.workers]) @@ -152,3 +181,41 @@ def shutdown(self): for worker in self.workers: worker.shutdown.remote() worker.__ray_terminate__.remote() + + +class PyTorchTrainable(Trainable): + @classmethod + def default_resource_request(cls, config): + return Resources( + cpu=0, + gpu=0, + extra_cpu=config["num_replicas"], + extra_gpu=int(config["use_gpu"]) * config["num_replicas"]) + + def _setup(self, config): + self._trainer = PyTorchTrainer( + model_creator=config["model_creator"], + data_creator=config["data_creator"], + optimizer_creator=config["optimizer_creator"], + config=config, + num_replicas=config["num_replicas"], + use_gpu=config["use_gpu"]) + + def _train(self): + + train_stats = self._trainer.train() + validation_stats = self._trainer.validate() + + train_stats.update(validation_stats) + + # output {"mean_loss": test_loss, "mean_accuracy": accuracy} + return train_stats + + def _save(self, checkpoint_dir): + return self._trainer.save(os.path.join(checkpoint_dir, "model.pth")) + + def _restore(self, checkpoint_path): + return self._trainer.restore(checkpoint_path) + + def _stop(self): + self._trainer.shutdown() diff --git a/python/ray/experimental/sgd/pytorch/utils.py b/python/ray/experimental/sgd/pytorch/utils.py index f16afe438386..817357c1e0df 100644 --- a/python/ray/experimental/sgd/pytorch/utils.py +++ b/python/ray/experimental/sgd/pytorch/utils.py @@ -10,7 +10,7 @@ import torch.nn as nn -def train(train_iterator, model, criterion, optimizer): +def train(model, train_iterator, criterion, optimizer): """Runs 1 training epoch""" batch_time = AverageMeter() data_time = AverageMeter() @@ -64,7 +64,7 @@ def train(train_iterator, model, criterion, optimizer): return stats -def validate(val_loader, model, criterion): +def validation(model, val_iterator, criterion): batch_time = AverageMeter() losses = AverageMeter() @@ -73,7 +73,7 @@ def validate(val_loader, model, criterion): with torch.no_grad(): end = time.time() - for i, (features, target) in enumerate(val_loader): + for i, (features, target) in enumerate(val_iterator): if torch.cuda.is_available(): features = features.cuda(non_blocking=True) @@ -211,17 +211,3 @@ def update(self, val, n=1): self.sum += val * n self.count += n self.avg = self.sum / self.count - - -def sgd_mse_optimizer(model, config): - """Returns the mean squared error criterion and SGD optimizer. - - Args: - model (torch.nn.Module): the model to optimize. - config (dict): configuration for the optimizer. - lr (float): the learning rate. defaults to 0.01. - """ - learning_rate = config.get("lr", 0.01) - criterion = nn.MSELoss() - optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate) - return criterion, optimizer diff --git a/python/ray/experimental/sgd/tests/pytorch_utils.py b/python/ray/experimental/sgd/tests/pytorch_utils.py index 6299fff1c13c..9aec45d0adce 100644 --- a/python/ray/experimental/sgd/tests/pytorch_utils.py +++ b/python/ray/experimental/sgd/tests/pytorch_utils.py @@ -2,10 +2,14 @@ from __future__ import division from __future__ import print_function +import os import numpy as np import torch import torch.nn as nn import torch.utils.data +import torchvision +import torchvision.transforms as transforms +import torchvision.models as models class LinearDataset(torch.utils.data.Dataset): @@ -30,11 +34,40 @@ def model_creator(config): def optimizer_creator(model, config): """Returns criterion, optimizer""" - criterion = nn.MSELoss() - optimizer = torch.optim.SGD(model.parameters(), lr=1e-4) - return criterion, optimizer + return torch.optim.SGD(model.parameters(), lr=config.get("lr", 1e-4)) def data_creator(config): """Returns training set, validation set""" return LinearDataset(2, 5), LinearDataset(2, 5, size=400) + + +def resnet_creator(config): + model_cls = models.__dict__["resnet50"] + return model_cls(pretrained=False) + + +def cifar_creator(config): + transform_train = transforms.Compose([ + transforms.RandomCrop(32, padding=4), + transforms.RandomHorizontalFlip(), + transforms.ToTensor(), + transforms.Normalize((0.4914, 0.4822, 0.4465), + (0.2023, 0.1994, 0.2010)), + ]) # meanstd transformation + + transform_test = transforms.Compose([ + transforms.ToTensor(), + transforms.Normalize((0.4914, 0.4822, 0.4465), + (0.2023, 0.1994, 0.2010)), + ]) + from filelock import FileLock + with FileLock(os.path.expanduser("~/data.lock")): + trainset = torchvision.datasets.CIFAR10( + root="~/data", + train=True, + download=True, + transform=transform_train) + valset = torchvision.datasets.CIFAR10( + root="~/data", train=False, download=False, transform=transform_test) + return trainset, valset