Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions doc/source/cluster/config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -815,6 +815,8 @@ The user that Ray will authenticate with when launching new nodes.
.. group-tab:: AWS

A string specifying a comma-separated list of availability zone(s) that nodes may be launched in.
Nodes will be launched in the first listed availability zone and will be tried in the following availability
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this statement to be true, I think we also need to change https://github.com/ray-project/ray/blob/master/python/ray/autoscaler/_private/aws/config.py#L443
From:
subnets = [s for s in subnets if s.availability_zone in azs]
To:
subnets = [s for az in azs for s in subnets if s.availability_zone == az]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow, great catch! I changed this and added a test for this!

zones if launching fails.

* **Required:** No
* **Importance:** Low
Expand Down
5 changes: 4 additions & 1 deletion python/ray/autoscaler/_private/aws/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,10 @@ def _configure_subnet(config):

if "availability_zone" in config["provider"]:
azs = config["provider"]["availability_zone"].split(",")
subnets = [s for s in subnets if s.availability_zone in azs]
subnets = [
s for az in azs # Iterate over AZs first to maintain the ordering
for s in subnets if s.availability_zone == az
]
if not subnets:
cli_logger.abort(
"No usable subnets matching availability zone {} found.\n"
Expand Down
19 changes: 11 additions & 8 deletions python/ray/autoscaler/_private/aws/node_provider.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import random
import copy
import threading
from collections import defaultdict, OrderedDict
Expand Down Expand Up @@ -98,9 +97,6 @@ def __init__(self, provider_config, cluster_name):
max_retries=0,
aws_credentials=aws_credentials)

# Try availability zones round-robin, starting from random offset
self.subnet_idx = random.randint(0, 100)

# Tags that we believe to actually be on EC2.
self.tag_cache = {}
# Tags that we will soon upload.
Expand Down Expand Up @@ -378,8 +374,13 @@ def _create_node(self, node_config, tags, count):
"TagSpecifications": tag_specs
})

# Try to always launch in the first listed subnet.
subnet_idx = 0
cli_logger_tags = {}
for attempt in range(1, BOTO_CREATE_MAX_RETRIES + 1):
# NOTE: This ensures that we try ALL availability zones before
# throwing an error.
max_tries = max(BOTO_CREATE_MAX_RETRIES, len(subnet_ids))
for attempt in range(1, max_tries + 1):
try:
if "NetworkInterfaces" in conf:
net_ifs = conf["NetworkInterfaces"]
Expand All @@ -388,8 +389,7 @@ def _create_node(self, node_config, tags, count):
conf.pop("SecurityGroupIds", None)
cli_logger_tags["network_interfaces"] = str(net_ifs)
else:
subnet_id = subnet_ids[self.subnet_idx % len(subnet_ids)]
self.subnet_idx += 1
subnet_id = subnet_ids[subnet_idx % len(subnet_ids)]
conf["SubnetId"] = subnet_id
cli_logger_tags["subnet_id"] = subnet_id

Expand Down Expand Up @@ -421,7 +421,7 @@ def _create_node(self, node_config, tags, count):
info=state_reason["Message"]))
break
except botocore.exceptions.ClientError as exc:
if attempt == BOTO_CREATE_MAX_RETRIES:
if attempt == max_tries:
cli_logger.abort(
"Failed to launch instances. Max attempts exceeded.",
exc=exc,
Expand All @@ -430,6 +430,9 @@ def _create_node(self, node_config, tags, count):
cli_logger.warning(
"create_instances: Attempt failed with {}, retrying.",
exc)
# Launch failure may be due to instance type availability in
# the given AZ
subnet_idx += 1
return created_nodes_dict

def terminate_node(self, node_id):
Expand Down
3 changes: 3 additions & 0 deletions python/ray/autoscaler/aws/example-full.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ available_node_types:
InstanceType: m5.large
ImageId: ami-0a2363a9cff180a64 # Deep Learning AMI (Ubuntu) Version 30
# Run workers on spot by default. Comment this out to use on-demand.
# NOTE: If relying on spot instances, it is best to specify multiple different instance
# types to avoid interruption when one instance type is experiencing heightened demand.
# Demand information can be found at https://aws.amazon.com/ec2/spot/instance-advisor/
InstanceMarketOptions:
MarketType: spot
# Additional options can be found in the boto docs, e.g.
Expand Down
30 changes: 28 additions & 2 deletions python/ray/tests/aws/test_autoscaler_aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import pytest
from unittest.mock import Mock, patch

from ray.autoscaler._private.aws.config import _get_vpc_id_or_die, \
bootstrap_aws, log_to_cli, \
from ray.autoscaler._private.aws.config import _configure_subnet, \
_get_vpc_id_or_die, bootstrap_aws, log_to_cli, \
DEFAULT_AMI
from ray.autoscaler._private.aws.node_provider import AWSNodeProvider
from ray.autoscaler._private.providers import _get_node_provider
Expand Down Expand Up @@ -694,6 +694,32 @@ def mock_get_cached_node(node_id):
assert nodes_to_include_in_call == nodes_included_in_call


def test_use_subnets_ordered_by_az(ec2_client_stub):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, thanks for adding this test! AZ ordering is an important feature to protect from regressions now that we've promised that we'll pack nodes into AZs in the order given in config.

"""
This test validates that when bootstrap_aws populates the SubnetIds field,
the subnets are ordered the same way as availability zones.

"""
# Add a response with a twenty subnets round-robined across the 4 AZs in
# `us-west-2` (a,b,c,d). At the end we should only have 15 subnets, ordered
# first from `us-west-2c`, then `us-west-2d`, then `us-west-2a`.
stubs.describe_twenty_subnets_in_different_azs(ec2_client_stub)

base_config = helpers.load_aws_example_config_file("example-full.yaml")
base_config["provider"][
"availability_zone"] = "us-west-2c,us-west-2d,us-west-2a"
config = _configure_subnet(base_config)

# We've filtered down to only subnets in 2c, 2d & 2a
for node_type in config["available_node_types"].values():
node_config = node_type["node_config"]
assert len(node_config["SubnetIds"]) == 15
offsets = [int(s.split("-")[1]) % 4 for s in node_config["SubnetIds"]]
assert set(offsets[:5]) == {2}, "First 5 should be in us-west-2c"
assert set(offsets[5:10]) == {3}, "Next 5 should be in us-west-2d"
assert set(offsets[10:15]) == {0}, "Last 5 should be in us-west-2a"


if __name__ == "__main__":
import sys
sys.exit(pytest.main(["-v", __file__]))
11 changes: 11 additions & 0 deletions python/ray/tests/aws/utils/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,17 @@ def subnet_in_vpc(vpc_num):
subnet_in_vpc(vpc_num) for vpc_num in range(1, 1000)
] + [DEFAULT_SUBNET]


def subnet_in_az(idx):
azs = ["a", "b", "c", "d"]
subnet = copy.copy(DEFAULT_SUBNET)
subnet["AvailabilityZone"] = "us-west-2" + azs[idx % 4]
subnet["SubnetId"] = f"subnet-{idx:07d}"
return subnet


TWENTY_SUBNETS_IN_DIFFERENT_AZS = [subnet_in_az(i) for i in range(20)]

# Secondary EC2 subnet to expose to tests as required.
AUX_SUBNET = {
"AvailabilityZone": "us-west-2a",
Expand Down
9 changes: 8 additions & 1 deletion python/ray/tests/aws/utils/stubs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from ray.tests.aws.utils.mocks import mock_path_exists_key_pair
from ray.tests.aws.utils.constants import DEFAULT_INSTANCE_PROFILE, \
DEFAULT_KEY_PAIR, DEFAULT_SUBNET, A_THOUSAND_SUBNETS_IN_DIFFERENT_VPCS, \
DEFAULT_LT
DEFAULT_LT, TWENTY_SUBNETS_IN_DIFFERENT_AZS

from unittest import mock

Expand Down Expand Up @@ -52,6 +52,13 @@ def describe_a_thousand_subnets_in_different_vpcs(ec2_client_stub):
service_response={"Subnets": A_THOUSAND_SUBNETS_IN_DIFFERENT_VPCS})


def describe_twenty_subnets_in_different_azs(ec2_client_stub):
ec2_client_stub.add_response(
"describe_subnets",
expected_params={},
service_response={"Subnets": TWENTY_SUBNETS_IN_DIFFERENT_AZS})


def skip_to_configure_sg(ec2_client_stub, iam_client_stub):
configure_iam_role_default(iam_client_stub)
configure_key_pair_default(ec2_client_stub)
Expand Down