diff --git a/python/ray/autoscaler/autoscaler.py b/python/ray/autoscaler/autoscaler.py index a3d6923147a4..6a049080b8ce 100644 --- a/python/ray/autoscaler/autoscaler.py +++ b/python/ray/autoscaler/autoscaler.py @@ -53,6 +53,10 @@ # The number of workers to launch initially, in addition to the head node. "initial_workers": (int, OPTIONAL), + # Whether or not to scale aggressively, e.g. to jump back to at least + # initial_workers if we're ever below it and are scaling up + "aggressive_autoscaling": (bool, OPTIONAL), + # The autoscaler will scale up the cluster to this target fraction of # resources usage. For example, if a cluster of 8 nodes is 100% busy # and target_utilization was 0.8, it would resize the cluster to 10. @@ -512,9 +516,13 @@ def target_num_workers(self): ideal_num_nodes = int(np.ceil(cur_used / float(target_frac))) ideal_num_workers = ideal_num_nodes - 1 # subtract 1 for head node + initial_workers = self.config["initial_workers"] + aggressive = self.config["aggressive_autoscaling"] if self.bringup: - ideal_num_workers = max(ideal_num_workers, - self.config["initial_workers"]) + ideal_num_workers = max(ideal_num_workers, initial_workers) + elif aggressive and ideal_num_workers >= 0: + # If we want any workers, we want at least initial_workers + ideal_num_workers = max(ideal_num_workers, initial_workers) return min(self.config["max_workers"], max(self.config["min_workers"], ideal_num_workers)) diff --git a/python/ray/autoscaler/aws/example-full.yaml b/python/ray/autoscaler/aws/example-full.yaml index f7c412e01e90..1d1c30031020 100644 --- a/python/ray/autoscaler/aws/example-full.yaml +++ b/python/ray/autoscaler/aws/example-full.yaml @@ -14,6 +14,11 @@ max_workers: 2 # subsequent `ray up`) this number of nodes will be started. initial_workers: 0 +# Whether or not to autoscale aggressively. If this is enabled, if at any point +# we would start more workers, we start at least enough to bring us to +# initial_workers. +aggressive_autoscaling: false + # This executes all commands on all nodes in the docker container, # and opens all the necessary ports to support the Ray cluster. # Empty string means disabled. diff --git a/python/ray/autoscaler/aws/example-gpu-docker.yaml b/python/ray/autoscaler/aws/example-gpu-docker.yaml index 8540ef584d6c..eaf9a255c174 100644 --- a/python/ray/autoscaler/aws/example-gpu-docker.yaml +++ b/python/ray/autoscaler/aws/example-gpu-docker.yaml @@ -14,6 +14,11 @@ max_workers: 2 # subsequent `ray up`) this number of nodes will be started. initial_workers: 0 +# Whether or not to autoscale aggressively. If this is enabled, if at any point +# we would start more workers, we start at least enough to bring us to +# initial_workers. +aggressive_autoscaling: false + # This executes all commands on all nodes in the docker container, # and opens all the necessary ports to support the Ray cluster. # Empty string means disabled. diff --git a/python/ray/autoscaler/gcp/example-full.yaml b/python/ray/autoscaler/gcp/example-full.yaml index 480a9827f8e2..bbf622baf588 100644 --- a/python/ray/autoscaler/gcp/example-full.yaml +++ b/python/ray/autoscaler/gcp/example-full.yaml @@ -14,6 +14,11 @@ max_workers: 2 # subsequent `ray up`) this number of nodes will be started. initial_workers: 0 +# Whether or not to autoscale aggressively. If this is enabled, if at any point +# we would start more workers, we start at least enough to bring us to +# initial_workers. +aggressive_autoscaling: false + # This executes all commands on all nodes in the docker container, # and opens all the necessary ports to support the Ray cluster. # Empty string means disabled. diff --git a/python/ray/autoscaler/gcp/example-gpu-docker.yaml b/python/ray/autoscaler/gcp/example-gpu-docker.yaml index b29a9f8f169c..4d5a032eb5c5 100644 --- a/python/ray/autoscaler/gcp/example-gpu-docker.yaml +++ b/python/ray/autoscaler/gcp/example-gpu-docker.yaml @@ -14,6 +14,11 @@ max_workers: 2 # subsequent `ray up`) this number of nodes will be started. initial_workers: 0 +# Whether or not to autoscale aggressively. If this is enabled, if at any point +# we would start more workers, we start at least enough to bring us to +# initial_workers. +aggressive_autoscaling: false + # This executes all commands on all nodes in the docker container, # and opens all the necessary ports to support the Ray cluster. # Empty string means disabled. diff --git a/python/ray/autoscaler/local/example-full.yaml b/python/ray/autoscaler/local/example-full.yaml index 51f64cbfbb6e..dda120f73a4a 100644 --- a/python/ray/autoscaler/local/example-full.yaml +++ b/python/ray/autoscaler/local/example-full.yaml @@ -2,6 +2,7 @@ cluster_name: default min_workers: 0 max_workers: 0 initial_workers: 0 +aggressive_autoscaling: false docker: image: "" container_name: "" diff --git a/python/ray/dashboard/dashboard.py b/python/ray/dashboard/dashboard.py index 97e13de028db..b4ba0ee2e536 100644 --- a/python/ray/dashboard/dashboard.py +++ b/python/ray/dashboard/dashboard.py @@ -141,6 +141,7 @@ async def ray_config(_) -> aiohttp.web.Response: "min_workers": cfg["min_workers"], "max_workers": cfg["max_workers"], "initial_workers": cfg["initial_workers"], + "aggressive_autoscaling": cfg["aggressive_autoscaling"], "idle_timeout_minutes": cfg["idle_timeout_minutes"], } @@ -181,7 +182,7 @@ def log_dashboard_url(self): def run(self): self.log_dashboard_url() self.node_stats.start() - aiohttp.web.run_app(self.app, host=self.ip, port=self.port) + aiohttp.web.run_app(self.app, host="0.0.0.0", port=self.port) class NodeStats(threading.Thread): diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index c9de2c0cbf21..74c747ab19f3 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -151,7 +151,7 @@ def cli(logging_level, logging_format): "--include-webui", is_flag=True, default=False, - help="provide this argument if the UI should not be started") + help="provide this argument if the UI should be started") @click.option( "--block", is_flag=True, @@ -256,6 +256,7 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards, raylet_socket_name=raylet_socket_name, temp_dir=temp_dir, include_java=include_java, + include_webui=include_webui, java_worker_options=java_worker_options, load_code_from_local=load_code_from_local, _internal_config=internal_config) @@ -334,9 +335,6 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards, if redis_max_clients is not None: raise Exception("If --head is not passed in, --redis-max-clients " "must not be provided.") - if include_webui: - raise Exception("If --head is not passed in, the --include-webui " - "flag is not relevant.") if include_java is not None: raise ValueError("--include-java should only be set for the head " "node.") diff --git a/python/ray/tune/examples/skopt_example.py b/python/ray/tune/examples/skopt_example.py index 71ac60951528..5112ebc7ba02 100644 --- a/python/ray/tune/examples/skopt_example.py +++ b/python/ray/tune/examples/skopt_example.py @@ -48,9 +48,13 @@ def easy_objective(config, reporter): } } optimizer = Optimizer([(0, 20), (-100, 100)]) + previously_run_params = [[10, 0], [15, -20]] + known_rewards = [-189, -1144] algo = SkOptSearch( optimizer, ["width", "height"], max_concurrent=4, - reward_attr="neg_mean_loss") + reward_attr="neg_mean_loss", + points_to_evaluate=previously_run_params, + evaluated_rewards=known_rewards) scheduler = AsyncHyperBandScheduler(reward_attr="neg_mean_loss") run_experiments(config, search_alg=algo, scheduler=scheduler) diff --git a/python/ray/tune/suggest/skopt.py b/python/ray/tune/suggest/skopt.py index 039c9d015b94..5618b50521b6 100644 --- a/python/ray/tune/suggest/skopt.py +++ b/python/ray/tune/suggest/skopt.py @@ -1,6 +1,8 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function +import numbers +from collections import Iterable try: import skopt @@ -24,10 +26,23 @@ class SkOptSearch(SuggestionAlgorithm): to 10. reward_attr (str): The training result objective value attribute. This refers to an increasing value. + points_to_evaluate (list of lists): A list of trials you'd like to run + first before sampling from the optimiser, e.g. these could be + parameter configurations you already know work well to help + the optimiser select good values. Each trial is a list of the + parameters of that trial using the order definition given + to the optimiser (see example below) + evaluated_rewards (list): If you have previously evaluated the + parameters passed in as points_to_evaluate you can avoid + re-running those trials by passing in the reward attributes + as a list so the optimiser can be told the results without + needing to re-compute the trial. Must be the same length as + points_to_evaluate. (See skopt_example.py) Example: >>> from skopt import Optimizer >>> optimizer = Optimizer([(0,20),(-100,100)]) + >>> current_best_params = [[10, 0], [15, -20]] >>> config = { >>> "my_exp": { >>> "run": "exp", @@ -39,7 +54,7 @@ class SkOptSearch(SuggestionAlgorithm): >>> } >>> algo = SkOptSearch(optimizer, >>> ["width", "height"], max_concurrent=4, - >>> reward_attr="neg_mean_loss") + >>> reward_attr="neg_mean_loss", points_to_evaluate=current_best_params) """ def __init__(self, @@ -47,11 +62,39 @@ def __init__(self, parameter_names, max_concurrent=10, reward_attr="episode_reward_mean", + points_to_evaluate=None, + evaluated_rewards=None, **kwargs): assert skopt is not None, """skopt must be installed! You can install Skopt with the command: `pip install scikit-optimize`.""" assert type(max_concurrent) is int and max_concurrent > 0 + if points_to_evaluate is None: + points_to_evaluate = [] + elif not isinstance(points_to_evaluate[0], (list, tuple)): + points_to_evaluate = [points_to_evaluate] + if not isinstance(points_to_evaluate, list): + raise ValueError( + "`points_to_evaluate` should be a list, but got %s" % + type(points_to_evaluate)) + if isinstance(evaluated_rewards, Iterable): + evaluated_rewards = list(evaluated_rewards) + elif isinstance(evaluated_rewards, numbers.Number): + evaluated_rewards = [evaluated_rewards] + self._initial_points = [] + if points_to_evaluate and evaluated_rewards: + if not (isinstance(evaluated_rewards, Iterable) + or isinstance(evaluated_rewards, numbers.Number)): + raise ValueError( + "`evaluated_rewards` should be an iterable or a scalar, got %s" + % type(evaluated_rewards)) + if len(points_to_evaluate) != len(evaluated_rewards): + raise ValueError( + "`points_to_evaluate` and `evaluated_rewards` should have the same length" + ) + optimizer.tell(points_to_evaluate, evaluated_rewards) + elif points_to_evaluate: + self._initial_points = points_to_evaluate self._max_concurrent = max_concurrent self._parameters = parameter_names self._reward_attr = reward_attr @@ -62,7 +105,11 @@ def __init__(self, def _suggest(self, trial_id): if self._num_live_trials() >= self._max_concurrent: return None - suggested_config = self._skopt_opt.ask() + if self._initial_points: + suggested_config = self._initial_points[0] + del self._initial_points[0] + else: + suggested_config = self._skopt_opt.ask() self._live_trial_mapping[trial_id] = suggested_config return dict(zip(self._parameters, suggested_config))