Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
6527abf
Change the log syncing behavior
hartikainen Mar 21, 2019
e760991
Merge branch 'master' into bunch-of-log-sync-fixes
richardliaw Apr 4, 2019
2ada6db
fix up abstractions for syncer
richardliaw Apr 4, 2019
26fe09b
Finished checkpoint syncing
richardliaw Apr 4, 2019
f45110d
Code
richardliaw Apr 4, 2019
ee5c61d
Set of changes to get things running
richardliaw Apr 30, 2019
045bfa4
Fixes for log syncing
hartikainen May 1, 2019
c5b1731
Merge branch 'master' into bunch-of-log-sync-fixes
richardliaw May 16, 2019
7d7ced1
Fix parts
richardliaw May 16, 2019
5ce47d7
Merge branch 'tune-submit-fix' into bunch-of-log-sync-fixes
richardliaw May 16, 2019
979a04c
Lint and other fixes
richardliaw May 16, 2019
91dad93
fix some test
richardliaw May 16, 2019
e3ecc72
Remove extra parsing functionality
richardliaw May 16, 2019
26a538f
Merge branch 'tune-relax-configs' into bunch-of-log-sync-fixes
richardliaw May 16, 2019
b0f6218
some test fixes
richardliaw May 17, 2019
5ca8eca
Fix up cloud syncing
richardliaw May 17, 2019
b7fd1e9
Another thing to do
richardliaw May 17, 2019
1ad642a
Merge branch 'master' into bunch-of-log-sync-fixes
richardliaw Jun 12, 2019
5bc70af
Fix up tests and local sync
richardliaw Jun 14, 2019
2b6d21f
Fix up tests, start on local migration
richardliaw Jun 14, 2019
f11800b
Merge branch 'master' into bunch-of-log-sync-fixes
richardliaw Jun 25, 2019
ed015f6
fix distributed migrations
richardliaw Jun 26, 2019
a39279e
comments
richardliaw Jun 26, 2019
368f90b
formatting
richardliaw Jun 26, 2019
2e38543
Better checkpoint directory handling
richardliaw Jun 26, 2019
099edb9
fix tests
richardliaw Jun 26, 2019
9d38c10
fix tests
richardliaw Jun 27, 2019
10302af
fix click
richardliaw Jun 27, 2019
2fcdfa8
comments
richardliaw Jun 27, 2019
7324426
formatting comments
richardliaw Jun 27, 2019
77735c5
formatting and comments
richardliaw Jun 27, 2019
b4b5937
Merge branch 'master' into bunch-of-log-sync-fixes
richardliaw Jun 27, 2019
8a4d0c2
sync function deprecations
richardliaw Jun 29, 2019
907d250
syncfunction
richardliaw Jun 29, 2019
167c269
Add documentation for Syncing and Uploading
richardliaw Jun 30, 2019
bd47848
nit
richardliaw Jun 30, 2019
95f59ac
BaseSyncer as base for Mixin in edge case
richardliaw Jun 30, 2019
5a2ef5e
more docs
richardliaw Jul 1, 2019
7246c57
clean up assertions
richardliaw Jul 1, 2019
4fabe08
validate
richardliaw Jul 1, 2019
bb4a950
nit
richardliaw Jul 1, 2019
fdebff3
Update test_cluster.py
richardliaw Jul 1, 2019
cd135d7
betterdoc
richardliaw Jul 1, 2019
652050f
Update tune-usage.rst
richardliaw Jul 2, 2019
94e3cac
cleanup
richardliaw Jul 2, 2019
6243211
Merge branch 'bunch-of-log-sync-fixes' of github.com:hartikainen/ray …
richardliaw Jul 2, 2019
d3cefa8
Merge branch 'master' into bunch-of-log-sync-fixes
richardliaw Jul 2, 2019
f825ec0
nit
richardliaw Jul 2, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 22 additions & 14 deletions doc/source/tune-usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ of a trial, you can additionally set the checkpoint_at_end to True. An example i
Recovering From Failures (Experimental)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Tune automatically persists the progress of your experiments, so if an experiment crashes or is otherwise cancelled, it can be resumed with ``resume=True``. The default setting of ``resume=False`` creates a new experiment, and ``resume="prompt"`` will cause Tune to prompt you for whether you want to resume. You can always force a new experiment to be created by changing the experiment name.
Tune automatically persists the progress of your experiments, so if an experiment crashes or is otherwise cancelled, it can be resumed by passing one of True, False, "LOCAL", "REMOTE", or "PROMPT" to ``tune.run(resume=...)``. The default setting of ``resume=False`` creates a new experiment. ``resume="LOCAL"`` and ``resume=True`` restore the experiment from ``local_dir/[experiment_name]``. ``resume="REMOTE"`` syncs the upload dir down to the local dir and then restores the experiment from ``local_dir/experiment_name``. ``resume="PROMPT"`` will cause Tune to prompt you for whether you want to resume. You can always force a new experiment to be created by changing the experiment name.

Note that trials will be restored to their last checkpoint. If trial checkpointing is not enabled, unfinished trials will be restarted from scratch.

Expand Down Expand Up @@ -399,31 +399,39 @@ An example can be found in `logging_example.py <https://github.com/ray-project/r
Custom Sync/Upload Commands
~~~~~~~~~~~~~~~~~~~~~~~~~~~

If an upload directory is provided, Tune will automatically sync results to the given
directory with standard S3/gsutil commands. You can customize the upload command by
providing either a function or a string.
Tune automatically syncs the trial folder on remote nodes back to the head node. This requires the ray cluster to be started with the `autoscaler <autoscaling.html>`__.
By default, local syncing requires rsync to be installed. You can customize the sync command with the ``sync_to_driver`` argument in ``tune.run`` by providing either a function or a string.

If a string is provided, then it must include replacement fields ``{local_dir}`` and
``{remote_dir}``, like ``"aws s3 sync {local_dir} {remote_dir}"``.

Alternatively, a function can be provided with the following signature (and must
be wrapped with ``tune.function``):
If a string is provided, then it must include replacement fields ``{source}`` and ``{target}``, like ``rsync -savz -e "ssh -i ssh_key.pem" {source} {target}``. Alternatively, a function can be provided with the following signature (and must be wrapped with ``tune.function``):

.. code-block:: python

def custom_sync_func(local_dir, remote_dir):
sync_cmd = "aws s3 sync {local_dir} {remote_dir}".format(
local_dir=local_dir,
remote_dir=remote_dir)
def custom_sync_func(source, target):
sync_cmd = "rsync {source} {target}".format(
source=source,
target=target)
sync_process = subprocess.Popen(sync_cmd, shell=True)
sync_process.wait()

tune.run(
MyTrainableClass,
name="experiment_name",
sync_function=tune.function(custom_sync_func)
sync_to_driver=tune.function(custom_sync_func),
)

When syncing results back to the driver, the source would be a path similar to ``ubuntu@192.0.0.1:/home/ubuntu/ray_results/trial1``, and the target would be a local path.
This custom sync command would be also be used in node failures, where the source argument would be the path to the trial directory and the target would be a remote path. The `sync_to_driver` would be invoked to push a checkpoint to new node for a queued trial to resume.

If an upload directory is provided, Tune will automatically sync results to the given directory, natively supporting standard S3/gsutil commands.
You can customize this to specify arbitrary storages with the ``sync_to_cloud`` argument. This argument is similar to ``sync_to_cloud`` in that it supports strings with the same replacement fields and arbitrary functions. See `syncer.py <https://github.com/ray-project/ray/blob/master/python/ray/tune/syncer.py>`__ for implementation details.

.. code-block:: python

tune.run(
MyTrainableClass,
name="experiment_name",
sync_to_cloud=tune.function(custom_sync_func),
)

Tune Client API
---------------
Expand Down
12 changes: 12 additions & 0 deletions python/ray/rllib/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import ray
from ray.tests.cluster_utils import Cluster
from ray.tune.config_parser import make_parser
from ray.tune.result import DEFAULT_RESULTS_DIR
from ray.tune.trial import resources_to_json
from ray.tune.tune import _make_scheduler, run_experiments

Expand Down Expand Up @@ -71,6 +72,17 @@ def create_parser(parser_creator=None):
default="default",
type=str,
help="Name of the subdirectory under `local_dir` to put results in.")
parser.add_argument(
"--local-dir",
default=DEFAULT_RESULTS_DIR,
type=str,
help="Local dir to save training results to. Defaults to '{}'.".format(
DEFAULT_RESULTS_DIR))
parser.add_argument(
"--upload-dir",
default="",
type=str,
help="Optional URI to sync training results to (e.g. s3://bucket).")
parser.add_argument(
"--resume",
action="store_true",
Expand Down
17 changes: 2 additions & 15 deletions python/ray/tune/config_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from six import string_types

from ray.tune import TuneError
from ray.tune.result import DEFAULT_RESULTS_DIR
from ray.tune.trial import Trial, json_to_resources
from ray.tune.logger import _SafeFallbackEncoder

Expand Down Expand Up @@ -65,17 +64,6 @@ def make_parser(parser_creator=None, **kwargs):
default=1,
type=int,
help="Number of times to repeat each trial.")
parser.add_argument(
"--local-dir",
default=DEFAULT_RESULTS_DIR,
type=str,
help="Local dir to save training results to. Defaults to '{}'.".format(
DEFAULT_RESULTS_DIR))
parser.add_argument(
"--upload-dir",
default="",
type=str,
help="Optional URI to sync training results to (e.g. s3://bucket).")
parser.add_argument(
"--checkpoint-freq",
default=0,
Expand Down Expand Up @@ -183,7 +171,7 @@ def create_trial_from_spec(spec, output_path, parser, **trial_kwargs):
trainable_name=spec["run"],
# json.load leads to str -> unicode in py2.7
config=spec.get("config", {}),
local_dir=os.path.join(args.local_dir, output_path),
local_dir=os.path.join(spec["local_dir"], output_path),
# json.load leads to str -> unicode in py2.7
stopping_criterion=spec.get("stop", {}),
checkpoint_freq=args.checkpoint_freq,
Expand All @@ -193,10 +181,9 @@ def create_trial_from_spec(spec, output_path, parser, **trial_kwargs):
export_formats=spec.get("export_formats", []),
# str(None) doesn't create None
restore_path=spec.get("restore"),
upload_dir=args.upload_dir,
trial_name_creator=spec.get("trial_name_creator"),
loggers=spec.get("loggers"),
# str(None) doesn't create None
sync_function=spec.get("sync_function"),
sync_to_driver_fn=spec.get("sync_to_driver"),
max_failures=args.max_failures,
**trial_kwargs)
13 changes: 5 additions & 8 deletions python/ray/tune/examples/logging_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@

import numpy as np

import ray
from ray import tune
from ray.tune import Trainable, run, Experiment
from ray.tune import Trainable, run


class TestLogger(tune.logger.Logger):
Expand Down Expand Up @@ -60,11 +59,11 @@ def _restore(self, checkpoint_path):
parser.add_argument(
"--smoke-test", action="store_true", help="Finish quickly for testing")
args, _ = parser.parse_known_args()
ray.init()
exp = Experiment(

trials = run(
MyTrainableClass,
name="hyperband_test",
run=MyTrainableClass,
num_samples=1,
num_samples=5,
trial_name_creator=tune.function(trial_str_creator),
loggers=[TestLogger],
stop={"training_iteration": 1 if args.smoke_test else 99999},
Expand All @@ -73,5 +72,3 @@ def _restore(self, checkpoint_path):
lambda spec: 10 + int(90 * random.random())),
"height": tune.sample_from(lambda spec: int(100 * random.random()))
})

trials = run(exp)
27 changes: 15 additions & 12 deletions python/ray/tune/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ class Experiment(object):
>>> },
>>> num_samples=10,
>>> local_dir="~/ray_results",
>>> upload_dir="s3://your_bucket/path",
>>> checkpoint_freq=10,
>>> max_failures=2)
"""
Expand All @@ -68,7 +67,7 @@ def __init__(self,
upload_dir=None,
trial_name_creator=None,
loggers=None,
sync_function=None,
sync_to_driver=None,
checkpoint_freq=0,
checkpoint_at_end=False,
keep_checkpoints_num=None,
Expand All @@ -78,18 +77,16 @@ def __init__(self,
restore=None,
repeat=None,
trial_resources=None,
custom_loggers=None):
if sync_function:
assert upload_dir, "Need `upload_dir` if sync_function given."

custom_loggers=None,
sync_function=None):
if repeat:
_raise_deprecation_note("repeat", "num_samples", soft=False)
if trial_resources:
_raise_deprecation_note(
"trial_resources", "resources_per_trial", soft=False)
if custom_loggers:
_raise_deprecation_note("custom_loggers", "loggers", soft=False)

if sync_function:
_raise_deprecation_note(
"sync_function", "sync_to_driver", soft=False)
run_identifier = Experiment._register_if_needed(run)
spec = {
"run": run_identifier,
Expand All @@ -98,10 +95,10 @@ def __init__(self,
"resources_per_trial": resources_per_trial,
"num_samples": num_samples,
"local_dir": os.path.expanduser(local_dir or DEFAULT_RESULTS_DIR),
"upload_dir": upload_dir or "", # argparse converts None to "null"
"upload_dir": upload_dir,
"trial_name_creator": trial_name_creator,
"loggers": loggers,
"sync_function": sync_function,
"sync_to_driver": sync_to_driver,
"checkpoint_freq": checkpoint_freq,
"checkpoint_at_end": checkpoint_at_end,
"keep_checkpoints_num": keep_checkpoints_num,
Expand Down Expand Up @@ -182,7 +179,13 @@ def local_dir(self):

@property
def checkpoint_dir(self):
return os.path.join(self.spec["local_dir"], self.name)
if self.local_dir:
return os.path.join(self.local_dir, self.name)

@property
def remote_checkpoint_dir(self):
if self.spec["upload_dir"]:
return os.path.join(self.spec["upload_dir"], self.name)


def convert_to_experiment_list(experiments):
Expand Down
Loading