Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 68 additions & 27 deletions python/ray/autoscaler/_private/kuberay/autoscaling_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@

import requests

from ray._private.label_utils import parse_node_labels_string
from ray._private.label_utils import (
parse_node_labels_string,
validate_node_label_syntax,
)
from ray.autoscaler._private.constants import (
DISABLE_LAUNCH_CONFIG_CHECK_KEY,
DISABLE_NODE_UPDATERS_KEY,
Expand Down Expand Up @@ -225,22 +228,28 @@ def _get_ray_resources_from_group_spec(
group_spec: Dict[str, Any], is_head: bool
) -> Dict[str, int]:
"""
Infers Ray resources from rayStartCommands and K8s limits.
Infers Ray resources from group `Resources` field, rayStartCommands, or K8s limits.
The resources extracted are used in autoscaling calculations.

TODO: Expose a better interface in the RayCluster CRD for Ray resource annotations.
For now, we take the rayStartParams as the primary source of truth.
"""
# Set resources from top-level group 'Resources' field if it exists.
group_resources = group_spec.get("resources", {})

ray_start_params = group_spec.get("rayStartParams", {})
# In KubeRay, Ray container is always the first application container of a Ray Pod.
k8s_resources = group_spec["template"]["spec"]["containers"][0].get("resources", {})
group_name = _HEAD_GROUP_NAME if is_head else group_spec["groupName"]

num_cpus = _get_num_cpus(ray_start_params, k8s_resources, group_name)
num_gpus = _get_num_gpus(ray_start_params, k8s_resources, group_name)
custom_resource_dict = _get_custom_resources(ray_start_params, group_name)
num_tpus = _get_num_tpus(custom_resource_dict, k8s_resources)
memory = _get_memory(ray_start_params, k8s_resources)
num_cpus = _get_num_cpus(
group_resources, ray_start_params, k8s_resources, group_name
)
num_gpus = _get_num_gpus(
group_resources, ray_start_params, k8s_resources, group_name
)
custom_resource_dict = _get_custom_resources(
group_resources, ray_start_params, group_name
)
num_tpus = _get_num_tpus(group_resources, custom_resource_dict, k8s_resources)
memory = _get_memory(group_resources, ray_start_params, k8s_resources)

# It's not allowed to use object store memory as a resource request, so we don't
# add that to the autoscaler's resources annotations.
Expand Down Expand Up @@ -302,9 +311,20 @@ def _get_ray_resources_from_group_spec(

def _get_labels_from_group_spec(group_spec: Dict[str, Any]) -> Dict[str, str]:
"""
Parses Ray node labels from rayStartParams for autoscaling config.
Labels are a comma-separated string of key-value pairs.
Parses Ray node labels for the autoscaling config based on the following
priority:
1. Top-level `labels` field in the group spec.
2. `labels` field in `rayStartParams`.
"""

# Check for top-level structured Labels field.
if "labels" in group_spec and isinstance(group_spec.get("labels"), dict):
labels_dict = group_spec.get("labels")
# Validate node labels follow expected Kubernetes label syntax.
validate_node_label_syntax(labels_dict)
return labels_dict

# Otherwise, check for `labels` in rayStartParams.
ray_start_params = group_spec.get("rayStartParams", {})
labels_str = ray_start_params.get("labels")

Expand All @@ -323,13 +343,16 @@ def _get_labels_from_group_spec(group_spec: Dict[str, Any]) -> Dict[str, str]:


def _get_num_cpus(
group_resources: Dict[str, str],
ray_start_params: Dict[str, str],
k8s_resources: Dict[str, Dict[str, str]],
group_name: str,
) -> int:
"""Get CPU annotation from ray_start_params or k8s_resources,
with priority for ray_start_params.
"""Get CPU annotation from `resources` field, ray_start_params or k8s_resources,
with priority for `resources` field.
"""
if "CPU" in group_resources:
return int(group_resources["CPU"])
if "num-cpus" in ray_start_params:
return int(ray_start_params["num-cpus"])
elif "cpu" in k8s_resources.get("limits", {}):
Expand All @@ -348,11 +371,15 @@ def _get_num_cpus(


def _get_memory(
ray_start_params: Dict[str, str], k8s_resources: Dict[str, Dict[str, str]]
group_resources: Dict[str, str],
ray_start_params: Dict[str, str],
k8s_resources: Dict[str, Dict[str, str]],
) -> Optional[int]:
"""Get memory resource annotation from ray_start_params or k8s_resources,
with priority for ray_start_params.
"""Get memory resource annotation from `resources` field, ray_start_params or k8s_resources,
with priority for `resources` field.
"""
if "memory" in group_resources:
return _round_up_k8s_quantity(group_resources["memory"])
if "memory" in ray_start_params:
return int(ray_start_params["memory"])
elif "memory" in k8s_resources.get("limits", {}):
Expand All @@ -365,15 +392,17 @@ def _get_memory(


def _get_num_gpus(
group_resources: Dict[str, str],
ray_start_params: Dict[str, str],
k8s_resources: Dict[str, Dict[str, str]],
group_name: str,
) -> Optional[int]:
"""Get memory resource annotation from ray_start_params or k8s_resources,
with priority for ray_start_params.
"""Get memory resource annotation from `resources` field, ray_start_params or k8s_resources,
with priority for `resources` field.
"""

if "num-gpus" in ray_start_params:
if "GPU" in group_resources:
return int(group_resources["GPU"])
elif "num-gpus" in ray_start_params:
return int(ray_start_params["num-gpus"])
else:
for key, resource_quantity in chain(
Expand All @@ -394,13 +423,16 @@ def _get_num_gpus(


def _get_num_tpus(
group_resources: Dict[str, str],
custom_resource_dict: Dict[str, int],
k8s_resources: Dict[str, Dict[str, str]],
) -> Optional[int]:
"""Get TPU custom resource annotation from custom_resource_dict in ray_start_params,
or k8s_resources, with priority for custom_resource_dict.
"""Get TPU custom resource annotation from `resources` field, custom_resource_dict in ray_start_params,
or k8s_resources, with priority for `resources` field.
"""
if "TPU" in custom_resource_dict:
if "TPU" in group_resources:
return int(group_resources["TPU"])
elif "TPU" in custom_resource_dict:
return custom_resource_dict["TPU"]
else:
for typ in ["limits", "requests"]:
Expand Down Expand Up @@ -430,17 +462,26 @@ def _round_up_k8s_quantity(quantity: str) -> int:


def _get_custom_resources(
ray_start_params: Dict[str, Any], group_name: str
group_resources: Dict[str, str], ray_start_params: Dict[str, Any], group_name: str
) -> Dict[str, int]:
"""Format custom resources based on the `resources` Ray start param.
"""Format custom resources based on the group `resources` field or `resources` Ray start param.

Currently, the value of the `resources` field must
Currently, the value of the rayStartParam `resources` field must
be formatted as follows:
'"{\"Custom1\": 1, \"Custom2\": 5}"'.

This method first converts the input to a correctly formatted
json string and then loads that json string to a dict.
"""
# If the top-level `resources` field is defined, use it as the exclusive source.
if group_resources:
standard_keys = {"CPU", "GPU", "memory"}
custom_resources = {
k: float(v) for k, v in group_resources.items() if k not in standard_keys
}
return custom_resources

# Otherwise, check rayStartParams.
if "resources" not in ray_start_params:
return {}
resources_string = ray_start_params["resources"]
Expand Down
103 changes: 100 additions & 3 deletions python/ray/tests/kuberay/test_autoscaling_config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import copy
import platform
import re
import sys
from pathlib import Path
from typing import Any, Dict, Optional, Type
Expand Down Expand Up @@ -223,6 +224,70 @@ def _get_ray_cr_with_tpu_k8s_resource_limit_and_custom_resource() -> dict:
return cr


def _get_ray_cr_with_top_level_labels() -> dict:
"""CR with a top-level `labels` field."""
cr = get_basic_ray_cr()
# This top-level structured labels take priority.
cr["spec"]["workerGroupSpecs"][0]["labels"] = {"instance-type": "mx5"}

# rayStartParams labels field should be ignored.
cr["spec"]["workerGroupSpecs"][0]["rayStartParams"]["labels"] = "instance-type=n2"
return cr


def _get_autoscaling_config_with_top_level_labels() -> dict:
config = _get_basic_autoscaling_config()
config["available_node_types"]["small-group"]["labels"] = {"instance-type": "mx5"}
return config


def _get_ray_cr_with_invalid_top_level_labels() -> dict:
"""CR with a syntactically invalid top-level `labels` field."""
cr = get_basic_ray_cr()
cr["spec"]["workerGroupSpecs"][0]["labels"] = {"!!invalid-key!!": "some-value"}
return cr


def _get_ray_cr_with_top_level_resources() -> dict:
"""CR with a top-level `resources` field to test priority."""
cr = get_basic_ray_cr()

# The top-level resources field should take priority.
cr["spec"]["workerGroupSpecs"][1]["resources"] = {
"CPU": "16",
"GPU": "8",
"memory": "2Gi",
"CustomResource": "99",
}
# These rayStartParams should be ignored.
cr["spec"]["workerGroupSpecs"][1]["rayStartParams"]["num-gpus"] = "1"
cr["spec"]["workerGroupSpecs"][1]["rayStartParams"][
"resources"
] = '"{"Custom2": 1}"'
return cr


def _get_autoscaling_config_with_top_level_resources() -> dict:
config = _get_basic_autoscaling_config()

config["available_node_types"]["gpu-group"]["resources"] = {
"CPU": 16.0,
"GPU": 8.0,
"memory": 2147483648.0,
"CustomResource": 99.0,
}
return config


def _get_ray_cr_with_top_level_tpu_resource() -> dict:
"""CR with a top-level `resources` field for the TPU custom resource."""
cr = _get_ray_cr_with_tpu_k8s_resource_limit_and_custom_resource()

# The top-level field should take priority.
cr["spec"]["workerGroupSpecs"][2]["resources"] = {"TPU": "8"}
return cr


def _get_ray_cr_with_no_tpus() -> dict:
cr = get_basic_ray_cr()
# remove TPU worker group
Expand Down Expand Up @@ -408,7 +473,33 @@ def test_resource_quantity(input: str, output: int):
None,
None,
None,
id="groups-with-labels",
id="groups-with-raystartparam-labels",
),
pytest.param(
_get_ray_cr_with_top_level_labels(),
_get_autoscaling_config_with_top_level_labels(),
None,
None,
None,
id="groups-with-top-level-labels",
),
pytest.param(
_get_ray_cr_with_top_level_resources(),
_get_autoscaling_config_with_top_level_resources(),
None,
None,
None,
id="groups-with-top-level-resources",
),
pytest.param(
_get_ray_cr_with_invalid_top_level_labels(),
None,
ValueError,
re.escape(
"Invalid label key name `!!invalid-key!!`. Name must be 63 chars or less beginning and ending with an alphanumeric character ([a-z0-9A-Z]) with dashes (-), underscores (_),dots (.), and alphanumerics between."
),
None,
id="invalid-top-level-labels",
),
]
)
Expand Down Expand Up @@ -604,6 +695,11 @@ def test_tpu_node_selectors_to_type(
0,
id="no-tpus-requested",
),
pytest.param(
_get_ray_cr_with_top_level_tpu_resource(),
8,
id="tpu-top-level-resource",
),
]
)

Expand All @@ -613,13 +709,14 @@ def test_tpu_node_selectors_to_type(
def test_get_num_tpus(ray_cr_in: Dict[str, Any], expected_num_tpus: int):
"""Verify that _get_num_tpus correctly returns the number of requested TPUs."""
for worker_group in ray_cr_in["spec"]["workerGroupSpecs"]:
group_resources = worker_group.get("resources", {})
ray_start_params = worker_group["rayStartParams"]
custom_resources = _get_custom_resources(
ray_start_params, worker_group["groupName"]
group_resources, ray_start_params, worker_group["groupName"]
)
k8s_resources = worker_group["template"]["spec"]["containers"][0]["resources"]

num_tpus = _get_num_tpus(custom_resources, k8s_resources)
num_tpus = _get_num_tpus(group_resources, custom_resources, k8s_resources)

if worker_group["groupName"] == "tpu-group":
assert num_tpus == expected_num_tpus
Expand Down