From d83d9b1f4e18a221593607e0351a0accdab462eb Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Tue, 14 May 2019 11:46:09 -0700 Subject: [PATCH 1/7] Fix submit --- python/ray/scripts/scripts.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index 01c8e267b85a..f83719710380 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -555,7 +555,7 @@ def rsync_up(cluster_config_file, source, target, cluster_name): rsync(cluster_config_file, source, target, cluster_name, down=False) -@cli.command() +@cli.command(context_settings={"ignore_unknown_options": True}) @click.argument("cluster_config_file", required=True, type=str) @click.option( "--docker", @@ -588,7 +588,8 @@ def rsync_up(cluster_config_file, source, target, cluster_name): @click.option( "--port-forward", required=False, type=int, help="Port to forward.") @click.argument("script", required=True, type=str) -@click.argument("script_args", required=False, type=str, nargs=-1) +@click.argument( + "script_args", required=False, type=click.UNPROCESSED, nargs=-1) def submit(cluster_config_file, docker, screen, tmux, stop, start, cluster_name, port_forward, script, script_args): """Uploads and runs a script on the specified cluster. From 9e0096984cce91969a8eba2688ce6fc46107241a Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Tue, 14 May 2019 16:50:06 -0700 Subject: [PATCH 2/7] fixcomamnds --- python/ray/scripts/scripts.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index f83719710380..19c6f33def2e 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -587,16 +587,18 @@ def rsync_up(cluster_config_file, source, target, cluster_name): help="Override the configured cluster name.") @click.option( "--port-forward", required=False, type=int, help="Port to forward.") -@click.argument("script", required=True, type=str) -@click.argument( - "script_args", required=False, type=click.UNPROCESSED, nargs=-1) +@click.argument("script", required=True, type=str, help="Script to upload.") +@click.option("--args", required=False, type=str, help="Script args.") def submit(cluster_config_file, docker, screen, tmux, stop, start, - cluster_name, port_forward, script, script_args): + cluster_name, port_forward, script, args): """Uploads and runs a script on the specified cluster. The script is automatically synced to the following location: os.path.join("~", os.path.basename(script)) + + Example: + >>> ray submit [CLUSTER.YAML] experiment.py --args="--smoke-test" """ assert not (screen and tmux), "Can specify only one of `screen` or `tmux`." @@ -607,7 +609,7 @@ def submit(cluster_config_file, docker, screen, tmux, stop, start, target = os.path.join("~", os.path.basename(script)) rsync(cluster_config_file, script, target, cluster_name, down=False) - cmd = " ".join(["python", target] + list(script_args)) + cmd = " ".join(["python", target] + args.split(" ")) exec_cluster(cluster_config_file, cmd, docker, screen, tmux, stop, False, cluster_name, port_forward) From 00504f480bb3d6ca700d98f062fb68bcb8692ce3 Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Tue, 14 May 2019 17:04:02 -0700 Subject: [PATCH 3/7] First pass --- python/ray/autoscaler/commands.py | 7 +++++- python/ray/autoscaler/updater.py | 38 ++++++++++++++++++------------- python/ray/scripts/scripts.py | 10 ++++---- 3 files changed, 34 insertions(+), 21 deletions(-) diff --git a/python/ray/autoscaler/commands.py b/python/ray/autoscaler/commands.py index 9a89261be7de..6484d1d164b5 100644 --- a/python/ray/autoscaler/commands.py +++ b/python/ray/autoscaler/commands.py @@ -448,7 +448,12 @@ def rsync(config_file, source, target, override_cluster_name, down): rsync = updater.rsync_down else: rsync = updater.rsync_up - rsync(source, target, check_error=False) + + if source and target: + rsync(source, target, check_error=False) + else: + updater.sync_file_mounts(rsync) + finally: provider.cleanup() diff --git a/python/ray/autoscaler/updater.py b/python/ray/autoscaler/updater.py index 9fff0c767467..aade70278f79 100644 --- a/python/ray/autoscaler/updater.py +++ b/python/ray/autoscaler/updater.py @@ -183,25 +183,11 @@ def wait_for_ssh(self, deadline): return False - def do_update(self): - self.provider.set_node_tags(self.node_id, - {TAG_RAY_NODE_STATUS: "waiting-for-ssh"}) - - deadline = time.time() + NODE_START_WAIT_S - self.set_ssh_ip_if_required() - - # Wait for SSH access - with LogTimer("NodeUpdater: " "{}: Got SSH".format(self.node_id)): - ssh_ok = self.wait_for_ssh(deadline) - assert ssh_ok, "Unable to SSH to node" - + def sync_file_mounts(self, sync_cmd): # Rsync file mounts self.provider.set_node_tags(self.node_id, {TAG_RAY_NODE_STATUS: "syncing-files"}) for remote_path, local_path in self.file_mounts.items(): - logger.info("NodeUpdater: " - "{}: Syncing {} to {}...".format( - self.node_id, local_path, remote_path)) assert os.path.exists(local_path), local_path if os.path.isdir(local_path): if not local_path.endswith("/"): @@ -217,7 +203,21 @@ def do_update(self): "mkdir -p {}".format(os.path.dirname(remote_path)), redirect=redirect, ) - self.rsync_up(local_path, remote_path, redirect=redirect) + sync_cmd(local_path, remote_path, redirect=redirect) + + def do_update(self): + self.provider.set_node_tags(self.node_id, + {TAG_RAY_NODE_STATUS: "waiting-for-ssh"}) + + deadline = time.time() + NODE_START_WAIT_S + self.set_ssh_ip_if_required() + + # Wait for SSH access + with LogTimer("NodeUpdater: " "{}: Got SSH".format(self.node_id)): + ssh_ok = self.wait_for_ssh(deadline) + assert ssh_ok, "Unable to SSH to node" + + self.sync_file_mounts(self.rsync_up) # Run init commands self.provider.set_node_tags(self.node_id, @@ -236,6 +236,9 @@ def do_update(self): self.ssh_cmd(cmd, redirect=redirect) def rsync_up(self, source, target, redirect=None, check_error=True): + logger.info("NodeUpdater: " + "{}: Syncing {} to {}...".format(self.node_id, source, + target)) self.set_ssh_ip_if_required() self.get_caller(check_error)( [ @@ -247,6 +250,9 @@ def rsync_up(self, source, target, redirect=None, check_error=True): stderr=redirect or sys.stderr) def rsync_down(self, source, target, redirect=None, check_error=True): + logger.info("NodeUpdater: " + "{}: Syncing {} from {}...".format(self.node_id, source, + target)) self.set_ssh_ip_if_required() self.get_caller(check_error)( [ diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index 19c6f33def2e..811fd59d3028 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -529,8 +529,8 @@ def attach(cluster_config_file, start, tmux, cluster_name, new): @cli.command() @click.argument("cluster_config_file", required=True, type=str) -@click.argument("source", required=True, type=str) -@click.argument("target", required=True, type=str) +@click.argument("source", required=False, type=str) +@click.argument("target", required=False, type=str) @click.option( "--cluster-name", "-n", @@ -538,13 +538,14 @@ def attach(cluster_config_file, start, tmux, cluster_name, new): type=str, help="Override the configured cluster name.") def rsync_down(cluster_config_file, source, target, cluster_name): + assert bool(source) == bool(target), "Must provide source and target." rsync(cluster_config_file, source, target, cluster_name, down=True) @cli.command() @click.argument("cluster_config_file", required=True, type=str) -@click.argument("source", required=True, type=str) -@click.argument("target", required=True, type=str) +@click.argument("source", required=False, type=str) +@click.argument("target", required=False, type=str) @click.option( "--cluster-name", "-n", @@ -552,6 +553,7 @@ def rsync_down(cluster_config_file, source, target, cluster_name): type=str, help="Override the configured cluster name.") def rsync_up(cluster_config_file, source, target, cluster_name): + assert bool(source) == bool(target), "Must provide source and target." rsync(cluster_config_file, source, target, cluster_name, down=False) From f2b7a5c1b609f61c427b625da897bd5e438d2217 Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Tue, 14 May 2019 17:07:08 -0700 Subject: [PATCH 4/7] Comments and fix --- python/ray/scripts/scripts.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index 19c6f33def2e..1951af208573 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -587,7 +587,7 @@ def rsync_up(cluster_config_file, source, target, cluster_name): help="Override the configured cluster name.") @click.option( "--port-forward", required=False, type=int, help="Port to forward.") -@click.argument("script", required=True, type=str, help="Script to upload.") +@click.argument("script", required=True, type=str) @click.option("--args", required=False, type=str, help="Script args.") def submit(cluster_config_file, docker, screen, tmux, stop, start, cluster_name, port_forward, script, args): @@ -609,7 +609,7 @@ def submit(cluster_config_file, docker, screen, tmux, stop, start, target = os.path.join("~", os.path.basename(script)) rsync(cluster_config_file, script, target, cluster_name, down=False) - cmd = " ".join(["python", target] + args.split(" ")) + cmd = " ".join(["python", target, args]) exec_cluster(cluster_config_file, cmd, docker, screen, tmux, stop, False, cluster_name, port_forward) From 083841101bf2dee06b9ca73830022de654c20486 Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Tue, 14 May 2019 17:12:04 -0700 Subject: [PATCH 5/7] Move tags --- python/ray/autoscaler/updater.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/ray/autoscaler/updater.py b/python/ray/autoscaler/updater.py index aade70278f79..c86750fe399d 100644 --- a/python/ray/autoscaler/updater.py +++ b/python/ray/autoscaler/updater.py @@ -185,8 +185,6 @@ def wait_for_ssh(self, deadline): def sync_file_mounts(self, sync_cmd): # Rsync file mounts - self.provider.set_node_tags(self.node_id, - {TAG_RAY_NODE_STATUS: "syncing-files"}) for remote_path, local_path in self.file_mounts.items(): assert os.path.exists(local_path), local_path if os.path.isdir(local_path): @@ -217,6 +215,8 @@ def do_update(self): ssh_ok = self.wait_for_ssh(deadline) assert ssh_ok, "Unable to SSH to node" + self.provider.set_node_tags(self.node_id, + {TAG_RAY_NODE_STATUS: "syncing-files"}) self.sync_file_mounts(self.rsync_up) # Run init commands From 662a6c3bbed17909fbba809e4e7e20cb0bb6576e Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Wed, 15 May 2019 12:41:34 -0700 Subject: [PATCH 6/7] UPdateexample --- python/ray/tune/examples/mnist_pytorch_trainable.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/python/ray/tune/examples/mnist_pytorch_trainable.py b/python/ray/tune/examples/mnist_pytorch_trainable.py index ac26d0353a98..7163dcfd6a01 100644 --- a/python/ray/tune/examples/mnist_pytorch_trainable.py +++ b/python/ray/tune/examples/mnist_pytorch_trainable.py @@ -49,6 +49,11 @@ action="store_true", default=False, help="disables CUDA training") +parser.add_argument( + "--redis-address", + default=None, + type=str, + help="The Redis address of the cluster.") parser.add_argument( "--seed", type=int, @@ -173,7 +178,7 @@ def _restore(self, checkpoint_path): from ray import tune from ray.tune.schedulers import HyperBandScheduler - ray.init() + ray.init(redis_address=args.redis_address) sched = HyperBandScheduler( time_attr="training_iteration", reward_attr="neg_mean_loss") tune.run( From d0512bcbcc0f7e74bd77aa58e2cf20837585bacf Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Thu, 16 May 2019 16:23:22 -0700 Subject: [PATCH 7/7] both or none --- python/ray/autoscaler/commands.py | 2 ++ python/ray/scripts/scripts.py | 2 -- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/python/ray/autoscaler/commands.py b/python/ray/autoscaler/commands.py index 6484d1d164b5..faaef8c6a153 100644 --- a/python/ray/autoscaler/commands.py +++ b/python/ray/autoscaler/commands.py @@ -423,6 +423,8 @@ def rsync(config_file, source, target, override_cluster_name, down): override_cluster_name: set the name of the cluster down: whether we're syncing remote -> local """ + assert bool(source) == bool(target), ( + "Must either provide both or neither source and target.") config = yaml.load(open(config_file).read()) if override_cluster_name is not None: diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index d1d0c9bc513b..e993478c1011 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -538,7 +538,6 @@ def attach(cluster_config_file, start, tmux, cluster_name, new): type=str, help="Override the configured cluster name.") def rsync_down(cluster_config_file, source, target, cluster_name): - assert bool(source) == bool(target), "Must provide source and target." rsync(cluster_config_file, source, target, cluster_name, down=True) @@ -553,7 +552,6 @@ def rsync_down(cluster_config_file, source, target, cluster_name): type=str, help="Override the configured cluster name.") def rsync_up(cluster_config_file, source, target, cluster_name): - assert bool(source) == bool(target), "Must provide source and target." rsync(cluster_config_file, source, target, cluster_name, down=False)