Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions doc/source/cluster/autoscaling.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
.. _ref-autoscaling:

Cluster Autoscaling
===================

Basics
------

The Ray Cluster Launcher will automatically enable a load-based autoscaler. When cluster resource usage exceeds a configurable threshold (80% by default), new nodes will be launched up to the specified ``max_workers`` limit (specified in the cluster config). When nodes are idle for more than a timeout, they will be removed, down to the ``min_workers`` limit. The head node is never removed.

In more detail, the autoscaler implements the following control loop:

1. It calculates the estimated utilization of the cluster based on the most-currently-assigned resource. For example, suppose a cluster has 100/200 CPUs assigned, but 20/25 GPUs assigned, then the utilization will be considered to be max(100/200, 15/25) = 60%.
2. If the estimated utilization is greater than the target (80% by default), then the autoscaler will attempt to add nodes to the cluster.
3. If a node is idle for a timeout (5 minutes by default), it is removed from the cluster.

The basic autoscaling config settings are as follows:

.. code::

# An unique identifier for the head node and workers of this cluster.
cluster_name: default

# The minimum number of workers nodes to launch in addition to the head
# node. This number should be >= 0.
min_workers: 0

# The autoscaler will scale up the cluster to this target fraction of resource
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# The autoscaler will scale up the cluster to this target fraction of resource
# The autoscaler will scale up the cluster to this target fraction of CPU

Not sure if this is accurate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's actually resource usage, not CPU

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we specify what resources are considered?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated this in the paragraph above.

# usage. For example, if a cluster of 10 nodes is 100% busy and
# target_utilization is 0.8, it would resize the cluster to 13. This fraction
# can be decreased to increase the aggressiveness of upscaling.
# The max value allowed is 1.0, which is the most conservative setting.
target_utilization_fraction: 0.8

# If a node is idle for this many minutes, it will be removed. A node is
# considered idle if there are no tasks or actors running on it.
idle_timeout_minutes: 5

Multiple Node Type Autoscaling
------------------------------

Ray supports multiple node types in a single cluster. In this mode of operation, the scheduler will look at the queue of resource shape demands from the cluster (e.g., there might be 10 tasks queued each requesting ``{"GPU": 4, "CPU": 16}``), and tries to add the minimum set of nodes that can fulfill these resource demands. This enables precise, rapid scale up compared to looking only at resource utilization, as the autoscaler also has visiblity into the aggregate resource load.

The concept of a cluster node type encompasses both the physical instance type (e.g., AWS p3.8xl GPU nodes vs m4.16xl CPU nodes), as well as other attributes (e.g., IAM role, the machine image, etc). `Custom resources <configure.html>`__ can be specified for each node type so that Ray is aware of the demand for specific node types at the application level (e.g., a task may request to be placed on a machine with a specific role or machine image via custom resource).

Multi-node type autoscaling operates in conjunction with the basic autoscaler. You may want to configure the basic autoscaler accordingly to act conservatively (i.e., set ``target_utilization_fraction: 1.0``).

An example of configuring multiple node types is as follows `(full example) <https://github.com/ray-project/ray/blob/master/python/ray/autoscaler/aws/example-multi-node-type.yaml>`__:

.. code::

# Specify the allowed node types and the resources they provide.
# The key is the name of the node type, which is just for debugging purposes.
# The node config specifies the launch config and physical instance type.
available_node_types:
cpu_4_ondemand:
node_config:
InstanceType: m4.xlarge
resources: {"CPU": 4}
max_workers: 5
cpu_16_spot:
node_config:
InstanceType: m4.4xlarge
InstanceMarketOptions:
MarketType: spot
resources: {"CPU": 16, "Custom1": 1, "is_spot": 1}
max_workers: 10
gpu_1_ondemand:
node_config:
InstanceType: p2.xlarge
resources: {"CPU": 4, "GPU": 1, "Custom2": 2}
max_workers: 4
worker_setup_commands:
- pip install tensorflow-gpu # Example command.
gpu_8_ondemand:
node_config:
InstanceType: p2.8xlarge
resources: {"CPU": 32, "GPU": 8}
max_workers: 2
worker_setup_commands:
- pip install tensorflow-gpu # Example command.

# Specify the node type of the head node (as configured above).
head_node_type: cpu_4_ondemand

# Specify the default type of the worker node (as configured above).
worker_default_node_type: cpu_16_spot


The above config defines two CPU node types (``cpu_4_ondemand`` and ``cpu_16_spot``), and two GPU types (``gpu_1_ondemand`` and ``gpu_8_ondemand``). Each node type has a name (e.g., ``cpu_4_ondemand``), which has no semantic meaning and is only for debugging. Let's look at the inner fields of the ``gpu_1_ondemand`` node type:

The node config tells the underlying Cloud provider how to launch a node of this type. This node config is merged with the top level node config of the YAML and can override fields (i.e., to specify the p2.xlarge instance type here):

.. code::

node_config:
InstanceType: p2.xlarge

The resources field tells the autoscaler what kinds of resources this node provides. This can include custom resources as well (e.g., "Custom2"). This field enables the autoscaler to automatically select the right kind of nodes to launch given the resource demands of the application. The resources specified here will be automatically passed to the ``ray start`` command for the node via an environment variable. For more information, see also the `resource demand scheduler <https://github.com/ray-project/ray/blob/master/python/ray/autoscaler/resource_demand_scheduler.py>`__:

.. code::

resources: {"CPU": 4, "GPU": 1, "Custom2": 2}

The ``max_workers`` field constrains the number of nodes of this type that can be launched:

.. code::

max_workers: 4

The ``worker_setup_commands`` field can be used to override the setup and initialization commands for a node type. Note that you can only override the setup for worker nodes. The head node's setup commands are always configured via the top level field in the cluster YAML:

.. code::

worker_setup_commands:
- pip install tensorflow-gpu # Example command.
9 changes: 0 additions & 9 deletions doc/source/cluster/launcher.rst
Original file line number Diff line number Diff line change
Expand Up @@ -213,15 +213,6 @@ This tells ``ray up`` to sync the current git branch SHA from your personal comp
2. Commit the changes with ``git commit`` and ``git push``
3. Update files on your Ray cluster with ``ray up``


Autoscaling
-----------

The Ray Cluster Launcher will automatically enable a load-based autoscaler. When cluster resource usage exceeds a configurable threshold (80% by default), new nodes will be launched up the specified ``max_workers`` limit (in the cluster config). When nodes are idle for more than a timeout, they will be removed, down to the ``min_workers`` limit. The head node is never removed.

The default idle timeout is 5 minutes, which can be set in the cluster config. This is to prevent excessive node churn which could impact performance and increase costs (in AWS / GCP there is a minimum billing charge of 1 minute per instance, after which usage is billed by the second).


Questions or Issues?
--------------------

Expand Down
1 change: 1 addition & 0 deletions doc/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ Academic Papers

cluster/index.rst
cluster/launcher.rst
cluster/autoscaling.rst
cluster/cloud.rst
cluster/deploy.rst

Expand Down
2 changes: 1 addition & 1 deletion python/ray/autoscaler/aws/example-full.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ docker:
# usage. For example, if a cluster of 10 nodes is 100% busy and
# target_utilization is 0.8, it would resize the cluster to 13. This fraction
# can be decreased to increase the aggressiveness of upscaling.
# This value must be less than 1.0 for scaling to happen.
# This max value allowed is 1.0, which is the most conservative setting.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# This max value allowed is 1.0, which is the most conservative setting.
# The max value allowed is 1.0, which is the most conservative setting.

target_utilization_fraction: 0.8

# If a node is idle for this many minutes, it will be removed.
Expand Down
15 changes: 7 additions & 8 deletions python/ray/autoscaler/aws/example-multi-node-type.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,34 +18,33 @@ available_node_types:
InstanceType: m4.xlarge
resources: {"CPU": 4}
max_workers: 5
cpu_4_spot:
cpu_16_spot:
node_config:
InstanceType: m4.xlarge
InstanceType: m4.4xlarge
InstanceMarketOptions:
MarketType: spot
resources: {"CPU": 4}
max_workers: 20
cpu_16_ondemand:
node_config:
InstanceType: m4.4xlarge
resources: {"CPU": 16, "Custom1": 1}
max_workers: 10
gpu_1_ondemand:
node_config:
InstanceType: p2.xlarge
resources: {"CPU": 4, "GPU": 1, "Custom2": 2}
max_workers: 4
worker_setup_commands:
- pip install tensorflow-gpu # Example command.
gpu_8_ondemand:
node_config:
InstanceType: p2.8xlarge
resources: {"CPU": 32, "GPU": 8}
max_workers: 2
worker_setup_commands:
- pip install tensorflow-gpu # Example command.

# Specify the node type of the head node (as configured above).
head_node_type: cpu_4_ondemand

# Specify the default type of the worker node (as configured above).
worker_default_node_type: cpu_4_spot
worker_default_node_type: cpu_16_spot

# The default settings for the head node. This will be merged with the per-node
# type configs given above.
Expand Down
84 changes: 50 additions & 34 deletions python/ray/autoscaler/resource_demand_scheduler.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
"""Implements multi-node-type autoscaling.

This file implements an autoscaling algorithm that is aware of multiple node
types (e.g., example-multi-node-type.yaml). The Ray autoscaler will pass in
a vector of resource shape demands, and the resource demand scheduler will
return a list of node types that can satisfy the demands given constraints
(i.e., reverse bin packing).
"""

import copy
import numpy as np
import logging
Expand Down Expand Up @@ -30,18 +39,43 @@ def __init__(self, provider: NodeProvider,
self.node_types = node_types
self.max_workers = max_workers

def debug_string(self, nodes: List[NodeID],
pending_nodes: Dict[NodeID, int]) -> str:
# TODO(ekl) take into account existing utilization of node resources. We
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

# should subtract these from node resources prior to running bin packing.
def get_nodes_to_launch(self, nodes: List[NodeID],
pending_nodes: Dict[NodeType, int],
resource_demands: List[ResourceDict]
) -> List[Tuple[NodeType, int]]:
"""Given resource demands, return node types to add to the cluster.

This method:
(1) calculates the resources present in the cluster.
(2) calculates the unfulfilled resource bundles.
(3) calculates which nodes need to be launched to fulfill all
the bundle requests, subject to max_worker constraints.

Args:
nodes: List of existing nodes in the cluster.
pending_nodes: Summary of node types currently being launched.
resource_demands: Vector of resource demands from the scheduler.
"""

if resource_demands is None:
logger.info("No resource demands")
return []

node_resources, node_type_counts = self.calculate_node_resources(
nodes, pending_nodes)
logger.info("Cluster resources: {}".format(node_resources))
logger.info("Node counts: {}".format(node_type_counts))

out = "Worker node types:"
for node_type, count in node_type_counts.items():
out += "\n - {}: {}".format(node_type, count)
if pending_nodes.get(node_type):
out += " ({} pending)".format(pending_nodes[node_type])
unfulfilled = get_bin_pack_residual(node_resources, resource_demands)
logger.info("Resource demands: {}".format(resource_demands))
logger.info("Unfulfilled demands: {}".format(unfulfilled))

return out
nodes = get_nodes_for(self.node_types, node_type_counts,
self.max_workers - len(nodes), unfulfilled)
logger.info("Node requests: {}".format(nodes))
return nodes

def calculate_node_resources(
self, nodes: List[NodeID], pending_nodes: Dict[NodeID, int]
Expand Down Expand Up @@ -73,36 +107,18 @@ def add_node(node_type):

return node_resources, node_type_counts

def get_nodes_to_launch(self, nodes: List[NodeID],
pending_nodes: Dict[NodeType, int],
resource_demands: List[ResourceDict]
) -> List[Tuple[NodeType, int]]:
"""Get a list of node types that should be added to the cluster.

This method:
(1) calculates the resources present in the cluster.
(2) calculates the unfulfilled resource bundles.
(3) calculates which nodes need to be launched to fulfill all
the bundle requests, subject to max_worker constraints.
"""

if resource_demands is None:
logger.info("No resource demands")
return []

def debug_string(self, nodes: List[NodeID],
pending_nodes: Dict[NodeID, int]) -> str:
node_resources, node_type_counts = self.calculate_node_resources(
nodes, pending_nodes)
logger.info("Cluster resources: {}".format(node_resources))
logger.info("Node counts: {}".format(node_type_counts))

unfulfilled = get_bin_pack_residual(node_resources, resource_demands)
logger.info("Resource demands: {}".format(resource_demands))
logger.info("Unfulfilled demands: {}".format(unfulfilled))
out = "Worker node types:"
for node_type, count in node_type_counts.items():
out += "\n - {}: {}".format(node_type, count)
if pending_nodes.get(node_type):
out += " ({} pending)".format(pending_nodes[node_type])

nodes = get_nodes_for(self.node_types, node_type_counts,
self.max_workers - len(nodes), unfulfilled)
logger.info("Node requests: {}".format(nodes))
return nodes
return out


def get_nodes_for(node_types: Dict[NodeType, NodeTypeConfigDict],
Expand Down