Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 110 additions & 38 deletions python/ray/autoscaler/_private/kuberay/autoscaling_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@

import requests

from ray._private.label_utils import parse_node_labels_string
from ray._private.label_utils import (
validate_node_label_syntax,
)
from ray.autoscaler._private.constants import (
DISABLE_LAUNCH_CONFIG_CHECK_KEY,
DISABLE_NODE_UPDATERS_KEY,
Expand Down Expand Up @@ -225,22 +227,28 @@ def _get_ray_resources_from_group_spec(
group_spec: Dict[str, Any], is_head: bool
) -> Dict[str, int]:
"""
Infers Ray resources from rayStartCommands and K8s limits.
Infers Ray resources from group `Resources` field, rayStartCommands, or K8s limits.
The resources extracted are used in autoscaling calculations.

TODO: Expose a better interface in the RayCluster CRD for Ray resource annotations.
For now, we take the rayStartParams as the primary source of truth.
"""
# Set resources from top-level group 'Resources' field if it exists.
group_resources = group_spec.get("resources", {})

ray_start_params = group_spec.get("rayStartParams", {})
# In KubeRay, Ray container is always the first application container of a Ray Pod.
k8s_resources = group_spec["template"]["spec"]["containers"][0].get("resources", {})
group_name = _HEAD_GROUP_NAME if is_head else group_spec["groupName"]

num_cpus = _get_num_cpus(ray_start_params, k8s_resources, group_name)
num_gpus = _get_num_gpus(ray_start_params, k8s_resources, group_name)
custom_resource_dict = _get_custom_resources(ray_start_params, group_name)
num_tpus = _get_num_tpus(custom_resource_dict, k8s_resources)
memory = _get_memory(ray_start_params, k8s_resources)
num_cpus = _get_num_cpus(
group_resources, ray_start_params, k8s_resources, group_name
)
num_gpus = _get_num_gpus(
group_resources, ray_start_params, k8s_resources, group_name
)
custom_resource_dict = _get_custom_resources(
group_resources, ray_start_params, group_name
)
num_tpus = _get_num_tpus(group_resources, custom_resource_dict, k8s_resources)
memory = _get_memory(group_resources, ray_start_params, k8s_resources)

# It's not allowed to use object store memory as a resource request, so we don't
# add that to the autoscaler's resources annotations.
Expand Down Expand Up @@ -302,34 +310,54 @@ def _get_ray_resources_from_group_spec(

def _get_labels_from_group_spec(group_spec: Dict[str, Any]) -> Dict[str, str]:
"""
Parses Ray node labels from rayStartParams for autoscaling config.
Labels are a comma-separated string of key-value pairs.
Parses Ray node labels for the autoscaling config based on the following
priority:
1. Top-level `labels` field in the group spec.
2. `labels` field in `rayStartParams`.
"""
labels_dict = {}

ray_start_params = group_spec.get("rayStartParams", {})
labels_str = ray_start_params.get("labels")
if labels_str:
logger.warning(
f"Ignoring labels: {labels_str} set in rayStartParams. Group labels are supported in the top-level Labels field starting in Kuberay v1.5"
)

if not labels_str:
return {}
# Check for top-level structured Labels field.
if "labels" in group_spec and isinstance(group_spec.get("labels"), dict):
labels_dict = group_spec.get("labels")
# Validate node labels follow expected Kubernetes label syntax.
if labels_dict:
try:
validate_node_label_syntax(labels_dict)
except ValueError as e:
group_name = group_spec.get("groupName", _HEAD_GROUP_NAME)
logger.error(
f"Error parsing `labels`: {labels_dict} for group {group_name}: {e}"
)
# Return an empty dict when failed to parse labels.
return {}

try:
return parse_node_labels_string(labels_str)
except ValueError as e:
group_name = group_spec.get("groupName", _HEAD_GROUP_NAME)
logger.error(
f"Error parsing `labels`: {labels_str} in rayStartParams for group {group_name}: {e}"
)
# Return an empty dict when failed to parse labels.
return {}
return labels_dict


def _get_num_cpus(
group_resources: Dict[str, str],
ray_start_params: Dict[str, str],
k8s_resources: Dict[str, Dict[str, str]],
group_name: str,
) -> int:
"""Get CPU annotation from ray_start_params or k8s_resources,
with priority for ray_start_params.
"""Get CPU annotation from `resources` field, ray_start_params or k8s_resources,
with priority for `resources` field.
"""
if "CPU" in group_resources:
if "num-cpus" in ray_start_params:
logger.warning(
f"'CPU' specified in both the top-level 'resources' field and in 'rayStartParams'. "
f"Using the value from 'resources': {group_resources['CPU']}."
)
return _round_up_k8s_quantity(group_resources["CPU"])
if "num-cpus" in ray_start_params:
return int(ray_start_params["num-cpus"])
elif "cpu" in k8s_resources.get("limits", {}):
Expand All @@ -348,11 +376,20 @@ def _get_num_cpus(


def _get_memory(
ray_start_params: Dict[str, str], k8s_resources: Dict[str, Dict[str, str]]
group_resources: Dict[str, str],
ray_start_params: Dict[str, str],
k8s_resources: Dict[str, Dict[str, str]],
) -> Optional[int]:
"""Get memory resource annotation from ray_start_params or k8s_resources,
with priority for ray_start_params.
"""Get memory resource annotation from `resources` field, ray_start_params or k8s_resources,
with priority for `resources` field.
"""
if "memory" in group_resources:
if "memory" in ray_start_params:
logger.warning(
f"'memory' specified in both the top-level 'resources' field and in 'rayStartParams'. "
f"Using the value from 'resources': {group_resources['memory']}."
)
return _round_up_k8s_quantity(group_resources["memory"])
if "memory" in ray_start_params:
return int(ray_start_params["memory"])
elif "memory" in k8s_resources.get("limits", {}):
Expand All @@ -365,15 +402,22 @@ def _get_memory(


def _get_num_gpus(
group_resources: Dict[str, str],
ray_start_params: Dict[str, str],
k8s_resources: Dict[str, Dict[str, str]],
group_name: str,
) -> Optional[int]:
"""Get memory resource annotation from ray_start_params or k8s_resources,
with priority for ray_start_params.
"""Get GPU resource annotation from `resources` field, ray_start_params or k8s_resources,
with priority for `resources` field.
"""

if "num-gpus" in ray_start_params:
if "GPU" in group_resources:
if "num-gpus" in ray_start_params:
logger.warning(
f"'GPU' specified in both the top-level 'resources' field and in 'rayStartParams'. "
f"Using the value from 'resources': {group_resources['GPU']}."
)
return _round_up_k8s_quantity(group_resources["GPU"])
elif "num-gpus" in ray_start_params:
return int(ray_start_params["num-gpus"])
else:
for key, resource_quantity in chain(
Expand All @@ -394,13 +438,16 @@ def _get_num_gpus(


def _get_num_tpus(
group_resources: Dict[str, str],
custom_resource_dict: Dict[str, int],
k8s_resources: Dict[str, Dict[str, str]],
) -> Optional[int]:
"""Get TPU custom resource annotation from custom_resource_dict in ray_start_params,
or k8s_resources, with priority for custom_resource_dict.
"""Get TPU custom resource annotation from `resources` field, custom_resource_dict in ray_start_params,
or k8s_resources, with priority for `resources` field.
"""
if "TPU" in custom_resource_dict:
if "TPU" in group_resources:
return _round_up_k8s_quantity(group_resources["TPU"])
elif "TPU" in custom_resource_dict:
return custom_resource_dict["TPU"]
else:
for typ in ["limits", "requests"]:
Expand Down Expand Up @@ -430,17 +477,42 @@ def _round_up_k8s_quantity(quantity: str) -> int:


def _get_custom_resources(
ray_start_params: Dict[str, Any], group_name: str
group_resources: Dict[str, str], ray_start_params: Dict[str, Any], group_name: str
) -> Dict[str, int]:
"""Format custom resources based on the `resources` Ray start param.
"""Format custom resources based on the group `resources` field or `resources` Ray start param.

Currently, the value of the `resources` field must
Currently, the value of the rayStartParam `resources` field must
be formatted as follows:
'"{\"Custom1\": 1, \"Custom2\": 5}"'.

This method first converts the input to a correctly formatted
json string and then loads that json string to a dict.
"""
# If the top-level `resources` field is defined, use it as the exclusive source.
if group_resources:
if "resources" in ray_start_params:
logger.warning(
f"custom resources specified in both the top-level 'resources' field and in 'rayStartParams'. "
f"Using the values from 'resources': {group_resources}."
)
standard_keys = {"CPU", "GPU", "memory"}
try:
custom_resources = {
k: _round_up_k8s_quantity(v)
for k, v in group_resources.items()
if k not in standard_keys
}
except Exception as e:
logger.error(
f"Error reading `resource` for group {group_name}."
" For the correct format, refer to example configuration at "
"https://github.com/ray-project/ray/blob/master/python/"
"ray/autoscaler/kuberay/ray-cluster.complete.yaml."
)
raise e
return custom_resources

# Otherwise, check rayStartParams.
if "resources" not in ray_start_params:
return {}
resources_string = ray_start_params["resources"]
Expand Down
Loading