Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
947adb6
[sgd] Replaced class Resources to the one in ray.tune.trial
jichan3751 Jul 23, 2019
192db10
Merge branch 'master' into remove_dup_resource
jichan3751 Jul 26, 2019
8dfae61
moved Resources from ray.tune.trial to new file ray.tune.resources
jichan3751 Jul 26, 2019
8900d08
added new file ray/tune/resources.py
jichan3751 Jul 26, 2019
a17620a
fix bugs
jichan3751 Jul 26, 2019
c8efb95
fixed bugs after running test_pytorch.py
jichan3751 Jul 26, 2019
8976b3b
Fix lint
richardliaw Jul 30, 2019
648111c
Merge branch 'master' into remove_dup_resource
richardliaw Jul 30, 2019
ff69de3
Set use_gpu
richardliaw Jul 30, 2019
234adbb
lint
richardliaw Jul 30, 2019
adac4ec
lint
richardliaw Jul 30, 2019
7140c78
Initial commit for trainable
richardliaw Jul 30, 2019
75b74a1
fix
richardliaw Jul 30, 2019
67225b4
fix
richardliaw Jul 30, 2019
68abf2a
fix
richardliaw Jul 30, 2019
844ba58
use_gpu - erroring at multinode execution
richardliaw Jul 30, 2019
18d2dc5
bash
richardliaw Jul 30, 2019
c901443
info messages
richardliaw Jul 30, 2019
ec9a521
env vars
richardliaw Jul 30, 2019
4474ce0
prints
richardliaw Jul 30, 2019
1d3ee08
remove dup
richardliaw Jul 31, 2019
24a258b
lint
richardliaw Jul 31, 2019
08b3b36
fix
richardliaw Jul 31, 2019
04828e6
Revert changes for debugging.
richardliaw Jul 31, 2019
c655bdf
backend
richardliaw Jul 31, 2019
e693521
nit
richardliaw Jul 31, 2019
ebc063d
hess
richardliaw Aug 1, 2019
ae92a41
Merge branch 'master' into pytorch_trainable
richardliaw Aug 1, 2019
eef8167
added pytorch_trainable, not yet working code
jichan3751 Aug 2, 2019
24925cc
pytorch trainable now works. Not yet tested distributed
jichan3751 Aug 2, 2019
895811f
Merge branch 'master' into HessianFlow
richardliaw Aug 4, 2019
bac2922
nit
richardliaw Aug 4, 2019
be26ce2
py
richardliaw Aug 5, 2019
bb2cad9
lint
richardliaw Aug 5, 2019
bb0f422
goodyaml
richardliaw Aug 5, 2019
f864329
roots
richardliaw Aug 5, 2019
4f63b8d
lint
richardliaw Aug 5, 2019
41d93fd
revert
richardliaw Aug 5, 2019
0eaab9f
Merge branch 'pytorch_trainable' into HessianFlow
richardliaw Aug 6, 2019
a6e86b4
Fix up loss creator
richardliaw Aug 6, 2019
72cec4c
lint
richardliaw Aug 6, 2019
fe0cdba
Merge branch 'master' into HessianFlow
richardliaw Aug 7, 2019
39e3cb3
nit
richardliaw Aug 7, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions python/ray/experimental/sgd/examples/example-sgd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ cluster_name: sgd-pytorch
# The maximum number of workers nodes to launch in addition to the head
# node. This takes precedence over min_workers. min_workers default to 0.
min_workers: 1
initial_workers: 1
max_workers: 1
initial_workers: 7
max_workers: 7

target_utilization_fraction: 0.9

Expand All @@ -27,7 +27,7 @@ auth:

head_node:
InstanceType: p3.8xlarge
ImageId: ami-0757fc5a639fe7666
ImageId: ami-0d96d570269578cd7
# InstanceMarketOptions:
# MarketType: spot
# SpotOptions:
Expand All @@ -36,7 +36,7 @@ head_node:

worker_nodes:
InstanceType: p3.8xlarge
ImageId: ami-0757fc5a639fe7666
ImageId: ami-0d96d570269578cd7
# InstanceMarketOptions:
# MarketType: spot
# SpotOptions:
Expand All @@ -47,12 +47,19 @@ worker_nodes:
# MarketType: spot

setup_commands:
- ray || pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.8.0.dev2-cp36-cp36m-manylinux1_x86_64.whl
- conda install -y pytorch torchvision cudatoolkit=9.0 -c pytorch
- pip install -U ipdb ray[rllib]
# Due to: https://github.com/pytorch/pytorch/issues/23534, torch 1.1 will not work due to NCCL errors.
- conda list -f torch | grep 1.2 || conda install -y pytorch-nightly=1.2.0.dev20190804 cudatoolkit=10.0 -c pytorch
# Since we are installing nightly torch 1.2, we need to build torchvision from source.
# --no-deps is to avoid reverting the above pytorch installation.
# torchvision must be installed after torch to maintain the correct linking of binaries
- pip list | grep torchvision || pip install git+https://github.com/pytorch/vision.git@6a834e983dbb8f6f233f40a301a64c4e1d49738c --no-deps
- ray || pip install -U ray
- pip install filelock ipdb ray[rllib]
- cd python_ray && python setup-dev.py --yes


file_mounts: {
~/python_ray/: /Users/rliaw/Research/riselab/ray/python/ray/
}

# Custom commands that will be run on the head node after common setup.
Expand Down
75 changes: 66 additions & 9 deletions python/ray/experimental/sgd/examples/train_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,85 @@
from __future__ import division
from __future__ import print_function

import os
import torch
import argparse
from ray.experimental.sgd.pytorch import PyTorchTrainer
from ray import tune
from ray.experimental.sgd.pytorch.pytorch_trainer import (PyTorchTrainer,
PyTorchTrainable)

import ray
from ray.experimental.sgd.pytorch import PyTorchTrainer
from ray.experimental.sgd.tests.pytorch_utils import (
model_creator, optimizer_creator, data_creator)
resnet_creator, xe_optimizer_creator, cifar_creator)


def initialization_hook(runner):
print("NCCL DEBUG SET")
# Need this for avoiding a connection restart issue
os.environ["NCCL_SOCKET_IFNAME"] = "^docker0,lo"
os.environ["NCCL_LL_THRESHOLD"] = "0"
os.environ["NCCL_DEBUG"] = "INFO"


def train(model, train_iterator, criterion, optimizer):
model.train()
train_loss, total_num, correct = 0, 0, 0
for batch_idx, (data, target) in enumerate(train_iterator):
# get small model update
if torch.cuda.is_available():
data, target = data.cuda(), target.cuda()
output = model(data)
loss = criterion(output, target) # / float(large_ratio)
loss.backward()
train_loss += loss.item() * target.size(0) # * float(large_ratio)
total_num += target.size(0)
_, predicted = output.max(1)
correct += predicted.eq(target).sum().item()
optimizer.step()
optimizer.zero_grad()
stats = {"train_loss": train_loss / total_num}
return stats


def train_example(num_replicas=1, use_gpu=False):
trainer1 = PyTorchTrainer(
model_creator,
data_creator,
optimizer_creator,
resnet_creator,
cifar_creator,
xe_optimizer_creator,
train_function=train,
num_replicas=num_replicas,
use_gpu=use_gpu,
backend="gloo")
batch_size=2048,
backend="nccl")
stats = trainer1.train()
print(stats)
trainer1.train()
trainer1.shutdown()
print("success!")


def tune_example(num_replicas=1, use_gpu=False):

# analysis = tune.run(PyTorchTrainable, num_samples=1, config=config)
analysis = tune.run(
PyTorchTrainer.make_trainable(
resnet_creator,
cifar_creator,
xe_optimizer_creator,
nn.MSELoss,
train_function=train_function,
),
num_samples=12,
config={
"num_replicas": num_replicas,
"use_gpu": use_gpu
},
stop={"training_iteration": 10},
verbose=1)
return analysis.get_best_config(metric="validation_loss", mode="min")


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
Expand All @@ -42,7 +101,5 @@ def train_example(num_replicas=1, use_gpu=False):
help="Enables GPU training")
args, _ = parser.parse_known_args()

import ray

ray.init(redis_address=args.redis_address)
train_example(num_replicas=args.num_replicas, use_gpu=args.use_gpu)
tune_example(num_replicas=args.num_replicas, use_gpu=args.use_gpu)
21 changes: 16 additions & 5 deletions python/ray/experimental/sgd/pytorch/distributed_pytorch_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ def __init__(self,
model_creator,
data_creator,
optimizer_creator,
loss_creator,
train_function=None,
validation_function=None,
initialization_hook=None,
config=None,
batch_size=16,
backend="gloo"):
Expand All @@ -34,7 +38,15 @@ def __init__(self,
backend (string): see pytorch_trainer.py.
"""
super(DistributedPyTorchRunner, self).__init__(
model_creator, data_creator, optimizer_creator, config, batch_size)
model_creator,
data_creator,
optimizer_creator,
loss_creator,
train_function=train_function,
validation_function=validation_function,
initialization_hook=initialization_hook,
config=config,
batch_size=batch_size)
self.backend = backend

def setup(self, url, world_rank, world_size):
Expand All @@ -49,7 +61,6 @@ def setup(self, url, world_rank, world_size):
self._setup_training()

def _setup_distributed_pytorch(self, url, world_rank, world_size):
os.environ["CUDA_LAUNCH_BLOCKING"] = "1"
with self._timers["setup_proc"]:
self.world_rank = world_rank
logger.debug(
Expand All @@ -72,9 +83,9 @@ def _setup_training(self):
self.model = torch.nn.parallel.DistributedDataParallelCPU(
self.model)

logger.debug("Creating optimizer")
self.criterion, self.optimizer = self.optimizer_creator(
self.model, self.config)
logger.debug("Creating optimizer.")
self.optimizer = self.optimizer_creator(self.model, self.config)
self.criterion = self.loss_creator(**self.config["loss_kwargs"])
if torch.cuda.is_available():
self.criterion = self.criterion.cuda()

Expand Down
22 changes: 15 additions & 7 deletions python/ray/experimental/sgd/pytorch/pytorch_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ def __init__(self,
model_creator,
data_creator,
optimizer_creator,
loss_creator,
train_function=None,
validation_function=None,
initialization_hook=None,
config=None,
batch_size=16):
"""Initializes the runner.
Expand All @@ -31,11 +35,15 @@ def __init__(self,
config (dict): see pytorch_trainer.py.
batch_size (int): see pytorch_trainer.py.
"""

if initialization_hook:
initialization_hook(self)
self.model_creator = model_creator
self.data_creator = data_creator
self.optimizer_creator = optimizer_creator
self.loss_creator = loss_creator
self.config = {} if config is None else config
self.train_function = train_function or utils.train
self.validation_function = validation_function or utils.validate
self.batch_size = batch_size
self.verbose = True

Expand All @@ -56,8 +64,8 @@ def setup(self):
self.model = self.model.cuda()

logger.debug("Creating optimizer")
self.criterion, self.optimizer = self.optimizer_creator(
self.model, self.config)
self.optimizer = self.optimizer_creator(self.model, self.config)
self.criterion = self.loss_creator(**self.config["loss_kwargs"])
if torch.cuda.is_available():
self.criterion = self.criterion.cuda()

Expand Down Expand Up @@ -89,8 +97,8 @@ def step(self):
"""Runs a training epoch and updates the model parameters."""
logger.debug("Begin Training Epoch {}".format(self.epoch + 1))
with self._timers["training"]:
train_stats = utils.train(self.train_loader, self.model,
self.criterion, self.optimizer)
train_stats = self.train_function(self.model, self.train_loader,
self.criterion, self.optimizer)
train_stats["epoch"] = self.epoch

self.epoch += 1
Expand All @@ -101,8 +109,8 @@ def step(self):
def validate(self):
"""Evaluates the model on the validation data set."""
with self._timers["validation"]:
validation_stats = utils.validate(self.validation_loader,
self.model, self.criterion)
validation_stats = self.validation_function(
self.model, self.validation_loader, self.criterion)

validation_stats.update(self.stats())
return validation_stats
Expand Down
87 changes: 77 additions & 10 deletions python/ray/experimental/sgd/pytorch/pytorch_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@
from __future__ import print_function

import numpy as np
import os
import torch
import torch.distributed as dist
import logging

import ray

from ray.tune import Trainable
from ray.tune.resources import Resources
from ray.experimental.sgd.pytorch.pytorch_runner import PyTorchRunner
from ray.experimental.sgd.pytorch.distributed_pytorch_runner import (
DistributedPyTorchRunner)
Expand All @@ -28,6 +31,8 @@ def __init__(self,
model_creator,
data_creator,
optimizer_creator=utils.sgd_mse_optimizer,
train_function=None,
validation_function=None,
config=None,
num_replicas=1,
use_gpu=False,
Expand Down Expand Up @@ -75,8 +80,14 @@ def __init__(self,
num_cpus=1, num_gpus=int(use_gpu))(PyTorchRunner)
# Start workers
self.workers = [
Runner.remote(model_creator, data_creator, optimizer_creator,
self.config, batch_size)
Runner.remote(
model_creator,
data_creator,
optimizer_creator,
train_function=train_function,
validation_function=validation_function,
config=self.config,
batch_size=batch_size)
]
# Get setup tasks in order to throw errors on failure
ray.get(self.workers[0].setup.remote())
Expand All @@ -97,8 +108,15 @@ def __init__(self,
num_replicas=num_replicas))
# Start workers
self.workers = [
Runner.remote(model_creator, data_creator, optimizer_creator,
self.config, batch_size_per_replica, backend)
Runner.remote(
model_creator,
data_creator,
optimizer_creator,
backend=backend,
train_function=train_function,
validation_function=validation_function,
config=self.config,
batch_size=batch_size_per_replica)
for i in range(num_replicas)
]
# Compute URL for initializing distributed PyTorch
Expand Down Expand Up @@ -136,14 +154,25 @@ def get_model(self):
model.load_state_dict(state["model"])
return model

def save(self, ckpt):
"""Saves the model at the provided checkpoint."""
def save(self, checkpoint):
"""Saves the model at the provided checkpoint.

Args:
checkpoint (str): Path to target checkpoint file.

"""
state = ray.get(self.workers[0].get_state.remote())
torch.save(state, ckpt)
torch.save(state, checkpoint)
return checkpoint

def restore(self, checkpoint):
"""Restores the model from the provided checkpoint.

def restore(self, ckpt):
"""Restores the model from the provided checkpoint."""
state = torch.load(ckpt)
Args:
checkpoint (str): Path to target checkpoint file.

"""
state = torch.load(checkpoint)
state_id = ray.put(state)
ray.get([worker.set_state.remote(state_id) for worker in self.workers])

Expand All @@ -152,3 +181,41 @@ def shutdown(self):
for worker in self.workers:
worker.shutdown.remote()
worker.__ray_terminate__.remote()


class PyTorchTrainable(Trainable):
@classmethod
def default_resource_request(cls, config):
return Resources(
cpu=0,
gpu=0,
extra_cpu=config["num_replicas"],
extra_gpu=int(config["use_gpu"]) * config["num_replicas"])

def _setup(self, config):
self._trainer = PyTorchTrainer(
model_creator=config["model_creator"],
data_creator=config["data_creator"],
optimizer_creator=config["optimizer_creator"],
config=config,
num_replicas=config["num_replicas"],
use_gpu=config["use_gpu"])

def _train(self):

train_stats = self._trainer.train()
validation_stats = self._trainer.validate()

train_stats.update(validation_stats)

# output {"mean_loss": test_loss, "mean_accuracy": accuracy}
return train_stats

def _save(self, checkpoint_dir):
return self._trainer.save(os.path.join(checkpoint_dir, "model.pth"))

def _restore(self, checkpoint_path):
return self._trainer.restore(checkpoint_path)

def _stop(self):
self._trainer.shutdown()
Loading