diff --git a/python/ray/autoscaler/commands.py b/python/ray/autoscaler/commands.py index 350a6250cdd2..096b081f2ac8 100644 --- a/python/ray/autoscaler/commands.py +++ b/python/ray/autoscaler/commands.py @@ -82,35 +82,37 @@ def teardown_cluster(config_file, yes, workers_only, override_cluster_name): provider = get_node_provider(config["provider"], config["cluster_name"]) - def remaining_nodes(): - if workers_only: - A = [] - else: - A = [ + try: + + def remaining_nodes(): + if workers_only: + A = [] + else: + A = [ + node_id for node_id in provider.nodes({ + TAG_RAY_NODE_TYPE: "head" + }) + ] + + A += [ node_id for node_id in provider.nodes({ - TAG_RAY_NODE_TYPE: "head" + TAG_RAY_NODE_TYPE: "worker" }) ] - - A += [ - node_id for node_id in provider.nodes({ - TAG_RAY_NODE_TYPE: "worker" - }) - ] - return A - - # Loop here to check that both the head and worker nodes are actually - # really gone - A = remaining_nodes() - with LogTimer("teardown_cluster: Termination done."): - while A: - logger.info("teardown_cluster: " - "Terminating {} nodes...".format(len(A))) - provider.terminate_nodes(A) - time.sleep(1) - A = remaining_nodes() - - provider.cleanup() + return A + + # Loop here to check that both the head and worker nodes are actually + # really gone + A = remaining_nodes() + with LogTimer("teardown_cluster: Termination done."): + while A: + logger.info("teardown_cluster: " + "Terminating {} nodes...".format(len(A))) + provider.terminate_nodes(A) + time.sleep(1) + A = remaining_nodes() + finally: + provider.cleanup() def kill_node(config_file, yes, override_cluster_name): @@ -147,121 +149,125 @@ def get_or_create_head_node(config, config_file, no_restart, restart_only, yes, override_cluster_name): """Create the cluster head node, which in turn creates the workers.""" provider = get_node_provider(config["provider"], config["cluster_name"]) - head_node_tags = { - TAG_RAY_NODE_TYPE: "head", - } - nodes = provider.nodes(head_node_tags) - if len(nodes) > 0: + try: + head_node_tags = { + TAG_RAY_NODE_TYPE: "head", + } + nodes = provider.nodes(head_node_tags) + if len(nodes) > 0: + head_node = nodes[0] + else: + head_node = None + + if not head_node: + confirm("This will create a new cluster", yes) + elif not no_restart: + confirm("This will restart cluster services", yes) + + launch_hash = hash_launch_conf(config["head_node"], config["auth"]) + if head_node is None or provider.node_tags(head_node).get( + TAG_RAY_LAUNCH_CONFIG) != launch_hash: + if head_node is not None: + confirm("Head node config out-of-date. It will be terminated", + yes) + logger.info( + "get_or_create_head_node: " + "Terminating outdated head node {}".format(head_node)) + provider.terminate_node(head_node) + logger.info("get_or_create_head_node: Launching new head node...") + head_node_tags[TAG_RAY_LAUNCH_CONFIG] = launch_hash + head_node_tags[TAG_RAY_NODE_NAME] = "ray-{}-head".format( + config["cluster_name"]) + provider.create_node(config["head_node"], head_node_tags, 1) + + nodes = provider.nodes(head_node_tags) + assert len(nodes) == 1, "Failed to create head node." head_node = nodes[0] - else: - head_node = None - - if not head_node: - confirm("This will create a new cluster", yes) - elif not no_restart: - confirm("This will restart cluster services", yes) - - launch_hash = hash_launch_conf(config["head_node"], config["auth"]) - if head_node is None or provider.node_tags(head_node).get( - TAG_RAY_LAUNCH_CONFIG) != launch_hash: - if head_node is not None: - confirm("Head node config out-of-date. It will be terminated", yes) - logger.info("get_or_create_head_node: " - "Terminating outdated head node {}".format(head_node)) - provider.terminate_node(head_node) - logger.info("get_or_create_head_node: Launching new head node...") - head_node_tags[TAG_RAY_LAUNCH_CONFIG] = launch_hash - head_node_tags[TAG_RAY_NODE_NAME] = "ray-{}-head".format( - config["cluster_name"]) - provider.create_node(config["head_node"], head_node_tags, 1) - - nodes = provider.nodes(head_node_tags) - assert len(nodes) == 1, "Failed to create head node." - head_node = nodes[0] - - # TODO(ekl) right now we always update the head node even if the hash - # matches. We could prompt the user for what they want to do in this case. - runtime_hash = hash_runtime_conf(config["file_mounts"], config) - logger.info("get_or_create_head_node: Updating files on head node...") - - # Rewrite the auth config so that the head node can update the workers - remote_key_path = "~/ray_bootstrap_key.pem" - remote_config = copy.deepcopy(config) - remote_config["auth"]["ssh_private_key"] = remote_key_path - - # Adjust for new file locations - new_mounts = {} - for remote_path in config["file_mounts"]: - new_mounts[remote_path] = remote_path - remote_config["file_mounts"] = new_mounts - remote_config["no_restart"] = no_restart - - # Now inject the rewritten config and SSH key into the head node - remote_config_file = tempfile.NamedTemporaryFile( - "w", prefix="ray-bootstrap-") - remote_config_file.write(json.dumps(remote_config)) - remote_config_file.flush() - config["file_mounts"].update({ - remote_key_path: config["auth"]["ssh_private_key"], - "~/ray_bootstrap_config.yaml": remote_config_file.name - }) - - if restart_only: - init_commands = config["head_start_ray_commands"] - elif no_restart: - init_commands = ( - config["setup_commands"] + config["head_setup_commands"]) - else: - init_commands = ( - config["setup_commands"] + config["head_setup_commands"] + - config["head_start_ray_commands"]) - - updater = NodeUpdaterThread( - head_node, - config["provider"], - provider, - config["auth"], - config["cluster_name"], - config["file_mounts"], - init_commands, - runtime_hash, - ) - updater.start() - updater.join() - - # Refresh the node cache so we see the external ip if available - provider.nodes(head_node_tags) - - if updater.exitcode != 0: - logger.error("get_or_create_head_node: " - "Updating {} failed".format( - provider.external_ip(head_node))) - sys.exit(1) - logger.info("get_or_create_head_node: " - "Head node up-to-date, IP address is: {}".format( - provider.external_ip(head_node))) - - monitor_str = "tail -n 100 -f /tmp/ray/session_*/logs/monitor*" - for s in init_commands: - if ("ray start" in s and "docker exec" in s - and "--autoscaling-config" in s): - monitor_str = "docker exec {} /bin/sh -c {}".format( - config["docker"]["container_name"], quote(monitor_str)) - if override_cluster_name: - modifiers = " --cluster-name={}".format(quote(override_cluster_name)) - else: - modifiers = "" - print("To monitor auto-scaling activity, you can run:\n\n" - " ray exec {} {}{}\n".format(config_file, quote(monitor_str), - modifiers)) - print("To open a console on the cluster:\n\n" - " ray attach {}{}\n".format(config_file, modifiers)) - print("To ssh manually to the cluster, run:\n\n" - " ssh -i {} {}@{}\n".format(config["auth"]["ssh_private_key"], - config["auth"]["ssh_user"], - provider.external_ip(head_node))) - provider.cleanup() + # TODO(ekl) right now we always update the head node even if the hash + # matches. We could prompt the user for what they want to do here. + runtime_hash = hash_runtime_conf(config["file_mounts"], config) + logger.info("get_or_create_head_node: Updating files on head node...") + + # Rewrite the auth config so that the head node can update the workers + remote_key_path = "~/ray_bootstrap_key.pem" + remote_config = copy.deepcopy(config) + remote_config["auth"]["ssh_private_key"] = remote_key_path + + # Adjust for new file locations + new_mounts = {} + for remote_path in config["file_mounts"]: + new_mounts[remote_path] = remote_path + remote_config["file_mounts"] = new_mounts + remote_config["no_restart"] = no_restart + + # Now inject the rewritten config and SSH key into the head node + remote_config_file = tempfile.NamedTemporaryFile( + "w", prefix="ray-bootstrap-") + remote_config_file.write(json.dumps(remote_config)) + remote_config_file.flush() + config["file_mounts"].update({ + remote_key_path: config["auth"]["ssh_private_key"], + "~/ray_bootstrap_config.yaml": remote_config_file.name + }) + + if restart_only: + init_commands = config["head_start_ray_commands"] + elif no_restart: + init_commands = ( + config["setup_commands"] + config["head_setup_commands"]) + else: + init_commands = ( + config["setup_commands"] + config["head_setup_commands"] + + config["head_start_ray_commands"]) + + updater = NodeUpdaterThread( + head_node, + config["provider"], + provider, + config["auth"], + config["cluster_name"], + config["file_mounts"], + init_commands, + runtime_hash, + ) + updater.start() + updater.join() + + # Refresh the node cache so we see the external ip if available + provider.nodes(head_node_tags) + + if updater.exitcode != 0: + logger.error("get_or_create_head_node: " + "Updating {} failed".format( + provider.external_ip(head_node))) + sys.exit(1) + logger.info("get_or_create_head_node: " + "Head node up-to-date, IP address is: {}".format( + provider.external_ip(head_node))) + + monitor_str = "tail -n 100 -f /tmp/ray/session_*/logs/monitor*" + for s in init_commands: + if ("ray start" in s and "docker exec" in s + and "--autoscaling-config" in s): + monitor_str = "docker exec {} /bin/sh -c {}".format( + config["docker"]["container_name"], quote(monitor_str)) + if override_cluster_name: + modifiers = " --cluster-name={}".format( + quote(override_cluster_name)) + else: + modifiers = "" + print("To monitor auto-scaling activity, you can run:\n\n" + " ray exec {} {}{}\n".format(config_file, quote(monitor_str), + modifiers)) + print("To open a console on the cluster:\n\n" + " ray attach {}{}\n".format(config_file, modifiers)) + print("To ssh manually to the cluster, run:\n\n" + " ssh -i {} {}@{}\n".format(config["auth"]["ssh_private_key"], + config["auth"]["ssh_user"], + provider.external_ip(head_node))) + finally: + provider.cleanup() def attach_cluster(config_file, start, use_tmux, override_cluster_name, new): @@ -314,43 +320,45 @@ def exec_cluster(config_file, cmd, screen, tmux, stop, start, config, config_file, override_cluster_name, create_if_needed=start) provider = get_node_provider(config["provider"], config["cluster_name"]) - updater = NodeUpdaterThread( - head_node, - config["provider"], - provider, - config["auth"], - config["cluster_name"], - config["file_mounts"], - [], - "", - ) - if stop: - cmd += ("; ray stop; ray teardown ~/ray_bootstrap_config.yaml --yes " + try: + updater = NodeUpdaterThread( + head_node, + config["provider"], + provider, + config["auth"], + config["cluster_name"], + config["file_mounts"], + [], + "", + ) + if stop: + cmd += ( + "; ray stop; ray teardown ~/ray_bootstrap_config.yaml --yes " "--workers-only; sudo shutdown -h now") - _exec( - updater, - cmd, - screen, - tmux, - expect_error=stop, - port_forward=port_forward) - - if tmux or screen: - attach_command_parts = ["ray attach", config_file] - if override_cluster_name is not None: - attach_command_parts.append( - "--cluster-name={}".format(override_cluster_name)) - if tmux: - attach_command_parts.append("--tmux") - elif screen: - attach_command_parts.append("--screen") - - attach_command = " ".join(attach_command_parts) - attach_info = "Use `{}` to check on command status.".format( - attach_command) - logger.info(attach_info) - - provider.cleanup() + _exec( + updater, + cmd, + screen, + tmux, + expect_error=stop, + port_forward=port_forward) + + if tmux or screen: + attach_command_parts = ["ray attach", config_file] + if override_cluster_name is not None: + attach_command_parts.append( + "--cluster-name={}".format(override_cluster_name)) + if tmux: + attach_command_parts.append("--tmux") + elif screen: + attach_command_parts.append("--screen") + + attach_command = " ".join(attach_command_parts) + attach_info = "Use `{}` to check on command status.".format( + attach_command) + logger.info(attach_info) + finally: + provider.cleanup() def _exec(updater, cmd, screen, tmux, expect_error=False, port_forward=None): @@ -395,23 +403,24 @@ def rsync(config_file, source, target, override_cluster_name, down): config, config_file, override_cluster_name, create_if_needed=False) provider = get_node_provider(config["provider"], config["cluster_name"]) - updater = NodeUpdaterThread( - head_node, - config["provider"], - provider, - config["auth"], - config["cluster_name"], - config["file_mounts"], - [], - "", - ) - if down: - rsync = updater.rsync_down - else: - rsync = updater.rsync_up - rsync(source, target, check_error=False) - - provider.cleanup() + try: + updater = NodeUpdaterThread( + head_node, + config["provider"], + provider, + config["auth"], + config["cluster_name"], + config["file_mounts"], + [], + "", + ) + if down: + rsync = updater.rsync_down + else: + rsync = updater.rsync_up + rsync(source, target, check_error=False) + finally: + provider.cleanup() def get_head_node_ip(config_file, override_cluster_name): @@ -422,9 +431,11 @@ def get_head_node_ip(config_file, override_cluster_name): config["cluster_name"] = override_cluster_name provider = get_node_provider(config["provider"], config["cluster_name"]) - head_node = _get_head_node(config, config_file, override_cluster_name) - ip = provider.external_ip(head_node) - provider.cleanup() + try: + head_node = _get_head_node(config, config_file, override_cluster_name) + ip = provider.external_ip(head_node) + finally: + provider.cleanup() return ip @@ -445,11 +456,13 @@ def _get_head_node(config, override_cluster_name, create_if_needed=False): provider = get_node_provider(config["provider"], config["cluster_name"]) - head_node_tags = { - TAG_RAY_NODE_TYPE: "head", - } - nodes = provider.nodes(head_node_tags) - provider.cleanup() + try: + head_node_tags = { + TAG_RAY_NODE_TYPE: "head", + } + nodes = provider.nodes(head_node_tags) + finally: + provider.cleanup() if len(nodes) > 0: head_node = nodes[0] diff --git a/python/ray/rllib/setup-rllib-dev.py b/python/ray/rllib/setup-rllib-dev.py index 3876a83f7988..d85f048d561c 100755 --- a/python/ray/rllib/setup-rllib-dev.py +++ b/python/ray/rllib/setup-rllib-dev.py @@ -11,28 +11,38 @@ import ray -if __name__ == "__main__": - rllib_home = os.path.abspath(os.path.join(ray.__file__, "../rllib")) - local_home = os.path.abspath(os.path.dirname(__file__)) - assert os.path.isdir(rllib_home), rllib_home + +def do_link(package): + package_home = os.path.abspath( + os.path.join(ray.__file__, "../{}".format(package))) + local_home = os.path.abspath( + os.path.join(__file__, "../../{}".format(package))) + assert os.path.isdir(package_home), package_home assert os.path.isdir(local_home), local_home - click.confirm( - "This will replace:\n {}\nwith a symlink to:\n {}".format( - rllib_home, local_home), - abort=True) - if os.access(os.path.dirname(rllib_home), os.W_OK): - subprocess.check_call(["rm", "-rf", rllib_home]) - subprocess.check_call(["ln", "-s", local_home, rllib_home]) + if not click.confirm( + "This will replace:\n {}\nwith a symlink to:\n {}".format( + package_home, local_home), + default=True): + return + if os.access(os.path.dirname(package_home), os.W_OK): + subprocess.check_call(["rm", "-rf", package_home]) + subprocess.check_call(["ln", "-s", local_home, package_home]) else: print("You don't have write permission to {}, using sudo:".format( - rllib_home)) - subprocess.check_call(["sudo", "rm", "-rf", rllib_home]) - subprocess.check_call(["sudo", "ln", "-s", local_home, rllib_home]) + package_home)) + subprocess.check_call(["sudo", "rm", "-rf", package_home]) + subprocess.check_call(["sudo", "ln", "-s", local_home, package_home]) + + +if __name__ == "__main__": + do_link("rllib") + do_link("tune") + do_link("autoscaler") print("Created links.\n\nIf you run into issues initializing Ray, please " - "ensure that your local repo and the installed Ray is in sync " + "ensure that your local repo and the installed Ray are in sync " "(pip install -U the latest wheels at " "https://ray.readthedocs.io/en/latest/installation.html, " "and ensure you are up-to-date on the master branch on git).\n\n" - "Note that you may need to delete the rllib symlink when pip " + "Note that you may need to delete the package symlinks when pip " "installing new Ray versions to prevent pip from overwriting files " "in your git repo.")