diff --git a/python/ray/autoscaler/_private/kuberay/autoscaling_config.py b/python/ray/autoscaler/_private/kuberay/autoscaling_config.py index 8960dd2df89f..cb5f9e262a7b 100644 --- a/python/ray/autoscaler/_private/kuberay/autoscaling_config.py +++ b/python/ray/autoscaler/_private/kuberay/autoscaling_config.py @@ -7,6 +7,7 @@ import requests +from ray._private.label_utils import parse_node_labels_string from ray.autoscaler._private.constants import ( DISABLE_LAUNCH_CONFIG_CHECK_KEY, DISABLE_NODE_UPDATERS_KEY, @@ -201,6 +202,7 @@ def _node_type_from_group_spec( max_workers = group_spec["maxReplicas"] * group_spec.get("numOfHosts", 1) resources = _get_ray_resources_from_group_spec(group_spec, is_head) + labels = _get_labels_from_group_spec(group_spec) node_type = { "min_workers": min_workers, @@ -209,6 +211,7 @@ def _node_type_from_group_spec( # Pod config data is required by the operator but not by the autoscaler. "node_config": {}, "resources": resources, + "labels": labels, } idle_timeout_s = group_spec.get(IDLE_SECONDS_KEY) @@ -297,6 +300,28 @@ def _get_ray_resources_from_group_spec( return resources +def _get_labels_from_group_spec(group_spec: Dict[str, Any]) -> Dict[str, str]: + """ + Parses Ray node labels from rayStartParams for autoscaling config. + Labels are a comma-separated string of key-value pairs. + """ + ray_start_params = group_spec.get("rayStartParams", {}) + labels_str = ray_start_params.get("labels") + + if not labels_str: + return {} + + try: + return parse_node_labels_string(labels_str) + except ValueError as e: + group_name = group_spec.get("groupName", _HEAD_GROUP_NAME) + logger.error( + f"Error parsing `labels`: {labels_str} in rayStartParams for group {group_name}: {e}" + ) + # Return an empty dict when failed to parse labels. + return {} + + def _get_num_cpus( ray_start_params: Dict[str, str], k8s_resources: Dict[str, Dict[str, str]], diff --git a/python/ray/tests/kuberay/test_autoscaling_config.py b/python/ray/tests/kuberay/test_autoscaling_config.py index 61a886d96758..8dc63b4192c3 100644 --- a/python/ray/tests/kuberay/test_autoscaling_config.py +++ b/python/ray/tests/kuberay/test_autoscaling_config.py @@ -74,6 +74,7 @@ def _get_basic_autoscaling_config() -> dict: }, "available_node_types": { "headgroup": { + "labels": {}, "max_workers": 0, "min_workers": 0, "node_config": {}, @@ -85,6 +86,7 @@ def _get_basic_autoscaling_config() -> dict: }, }, "small-group": { + "labels": {}, "max_workers": 300, "min_workers": 0, "node_config": {}, @@ -98,6 +100,7 @@ def _get_basic_autoscaling_config() -> dict: # Same as "small-group" with a GPU resource entry added # and modified max_workers. "gpu-group": { + "labels": {}, "max_workers": 200, "min_workers": 0, "node_config": {}, @@ -112,6 +115,7 @@ def _get_basic_autoscaling_config() -> dict: # Same as "small-group" with a TPU resource entry added # and modified max_workers and node_config. "tpu-group": { + "labels": {}, "max_workers": 8, "min_workers": 0, "node_config": {}, @@ -238,6 +242,45 @@ def _get_ray_cr_with_only_requests() -> dict: return cr +def _get_ray_cr_with_labels() -> dict: + """CR with labels in rayStartParams of head and worker groups.""" + cr = get_basic_ray_cr() + + # Pass invalid labels to the head group to test error handling. + cr["spec"]["headGroupSpec"]["rayStartParams"]["labels"] = "!!ray.io/node-group=," + # Pass valid labels to each of the worker groups. + cr["spec"]["workerGroupSpecs"][0]["rayStartParams"][ + "labels" + ] = "ray.io/availability-region=us-central2, ray.io/market-type=spot" + cr["spec"]["workerGroupSpecs"][1]["rayStartParams"][ + "labels" + ] = "ray.io/accelerator-type=A100" + cr["spec"]["workerGroupSpecs"][2]["rayStartParams"][ + "labels" + ] = "ray.io/accelerator-type=TPU-V4" + return cr + + +def _get_autoscaling_config_with_labels() -> dict: + """Autoscaling config with parsed labels for each group.""" + config = _get_basic_autoscaling_config() + + # Since we passed invalid labels to the head group `rayStartParams`, + # we expect an empty dictionary in the autoscaling config. + config["available_node_types"]["headgroup"]["labels"] = {} + config["available_node_types"]["small-group"]["labels"] = { + "ray.io/availability-region": "us-central2", + "ray.io/market-type": "spot", + } + config["available_node_types"]["gpu-group"]["labels"] = { + "ray.io/accelerator-type": "A100" + } + config["available_node_types"]["tpu-group"]["labels"] = { + "ray.io/accelerator-type": "TPU-V4" + } + return config + + def _get_autoscaling_config_with_options() -> dict: config = _get_basic_autoscaling_config() config["upscaling_speed"] = 1 @@ -359,6 +402,14 @@ def test_resource_quantity(input: str, output: int): None, id="tpu-k8s-resource-limit-and-custom-resource", ), + pytest.param( + _get_ray_cr_with_labels(), + _get_autoscaling_config_with_labels(), + None, + None, + None, + id="groups-with-labels", + ), ] )