diff --git a/python/ray/autoscaler/_private/kuberay/autoscaling_config.py b/python/ray/autoscaler/_private/kuberay/autoscaling_config.py index cb5f9e262a7b..e61f453d87c8 100644 --- a/python/ray/autoscaler/_private/kuberay/autoscaling_config.py +++ b/python/ray/autoscaler/_private/kuberay/autoscaling_config.py @@ -7,7 +7,9 @@ import requests -from ray._private.label_utils import parse_node_labels_string +from ray._private.label_utils import ( + validate_node_label_syntax, +) from ray.autoscaler._private.constants import ( DISABLE_LAUNCH_CONFIG_CHECK_KEY, DISABLE_NODE_UPDATERS_KEY, @@ -225,22 +227,28 @@ def _get_ray_resources_from_group_spec( group_spec: Dict[str, Any], is_head: bool ) -> Dict[str, int]: """ - Infers Ray resources from rayStartCommands and K8s limits. + Infers Ray resources from group `Resources` field, rayStartCommands, or K8s limits. The resources extracted are used in autoscaling calculations. - - TODO: Expose a better interface in the RayCluster CRD for Ray resource annotations. - For now, we take the rayStartParams as the primary source of truth. """ + # Set resources from top-level group 'Resources' field if it exists. + group_resources = group_spec.get("resources", {}) + ray_start_params = group_spec.get("rayStartParams", {}) # In KubeRay, Ray container is always the first application container of a Ray Pod. k8s_resources = group_spec["template"]["spec"]["containers"][0].get("resources", {}) group_name = _HEAD_GROUP_NAME if is_head else group_spec["groupName"] - num_cpus = _get_num_cpus(ray_start_params, k8s_resources, group_name) - num_gpus = _get_num_gpus(ray_start_params, k8s_resources, group_name) - custom_resource_dict = _get_custom_resources(ray_start_params, group_name) - num_tpus = _get_num_tpus(custom_resource_dict, k8s_resources) - memory = _get_memory(ray_start_params, k8s_resources) + num_cpus = _get_num_cpus( + group_resources, ray_start_params, k8s_resources, group_name + ) + num_gpus = _get_num_gpus( + group_resources, ray_start_params, k8s_resources, group_name + ) + custom_resource_dict = _get_custom_resources( + group_resources, ray_start_params, group_name + ) + num_tpus = _get_num_tpus(group_resources, custom_resource_dict, k8s_resources) + memory = _get_memory(group_resources, ray_start_params, k8s_resources) # It's not allowed to use object store memory as a resource request, so we don't # add that to the autoscaler's resources annotations. @@ -302,34 +310,45 @@ def _get_ray_resources_from_group_spec( def _get_labels_from_group_spec(group_spec: Dict[str, Any]) -> Dict[str, str]: """ - Parses Ray node labels from rayStartParams for autoscaling config. - Labels are a comma-separated string of key-value pairs. + Parses Ray node labels for the autoscaling config based on the following + priority: + 1. Top-level `labels` field in the group spec. + 2. `labels` field in `rayStartParams`. """ + labels_dict = {} + ray_start_params = group_spec.get("rayStartParams", {}) labels_str = ray_start_params.get("labels") + if labels_str: + logger.warning( + f"Ignoring labels: {labels_str} set in rayStartParams. Group labels are supported in the top-level Labels field starting in KubeRay v1.5" + ) - if not labels_str: - return {} + # Check for top-level structured Labels field. + if "labels" in group_spec and isinstance(group_spec.get("labels"), dict): + labels_dict = group_spec.get("labels") + # Validate node labels follow expected Kubernetes label syntax. + validate_node_label_syntax(labels_dict) - try: - return parse_node_labels_string(labels_str) - except ValueError as e: - group_name = group_spec.get("groupName", _HEAD_GROUP_NAME) - logger.error( - f"Error parsing `labels`: {labels_str} in rayStartParams for group {group_name}: {e}" - ) - # Return an empty dict when failed to parse labels. - return {} + return labels_dict def _get_num_cpus( + group_resources: Dict[str, str], ray_start_params: Dict[str, str], k8s_resources: Dict[str, Dict[str, str]], group_name: str, ) -> int: - """Get CPU annotation from ray_start_params or k8s_resources, - with priority for ray_start_params. + """Get CPU annotation from `resources` field, ray_start_params or k8s_resources, + with priority for `resources` field. """ + if "CPU" in group_resources: + if "num-cpus" in ray_start_params: + logger.warning( + f"'CPU' specified in both the top-level 'resources' field and in 'rayStartParams'. " + f"Using the value from 'resources': {group_resources['CPU']}." + ) + return _round_up_k8s_quantity(group_resources["CPU"]) if "num-cpus" in ray_start_params: return int(ray_start_params["num-cpus"]) elif "cpu" in k8s_resources.get("limits", {}): @@ -348,11 +367,20 @@ def _get_num_cpus( def _get_memory( - ray_start_params: Dict[str, str], k8s_resources: Dict[str, Dict[str, str]] + group_resources: Dict[str, str], + ray_start_params: Dict[str, str], + k8s_resources: Dict[str, Dict[str, str]], ) -> Optional[int]: - """Get memory resource annotation from ray_start_params or k8s_resources, - with priority for ray_start_params. + """Get memory resource annotation from `resources` field, ray_start_params or k8s_resources, + with priority for `resources` field. """ + if "memory" in group_resources: + if "memory" in ray_start_params: + logger.warning( + f"'memory' specified in both the top-level 'resources' field and in 'rayStartParams'. " + f"Using the value from 'resources': {group_resources['memory']}." + ) + return _round_up_k8s_quantity(group_resources["memory"]) if "memory" in ray_start_params: return int(ray_start_params["memory"]) elif "memory" in k8s_resources.get("limits", {}): @@ -365,15 +393,22 @@ def _get_memory( def _get_num_gpus( + group_resources: Dict[str, str], ray_start_params: Dict[str, str], k8s_resources: Dict[str, Dict[str, str]], group_name: str, ) -> Optional[int]: - """Get memory resource annotation from ray_start_params or k8s_resources, - with priority for ray_start_params. + """Get GPU resource annotation from `resources` field, ray_start_params or k8s_resources, + with priority for `resources` field. """ - - if "num-gpus" in ray_start_params: + if "GPU" in group_resources: + if "num-gpus" in ray_start_params: + logger.warning( + f"'GPU' specified in both the top-level 'resources' field and in 'rayStartParams'. " + f"Using the value from 'resources': {group_resources['GPU']}." + ) + return _round_up_k8s_quantity(group_resources["GPU"]) + elif "num-gpus" in ray_start_params: return int(ray_start_params["num-gpus"]) else: for key, resource_quantity in chain( @@ -394,13 +429,16 @@ def _get_num_gpus( def _get_num_tpus( + group_resources: Dict[str, str], custom_resource_dict: Dict[str, int], k8s_resources: Dict[str, Dict[str, str]], ) -> Optional[int]: - """Get TPU custom resource annotation from custom_resource_dict in ray_start_params, - or k8s_resources, with priority for custom_resource_dict. + """Get TPU custom resource annotation from `resources` field, custom_resource_dict in ray_start_params, + or k8s_resources, with priority for `resources` field. """ - if "TPU" in custom_resource_dict: + if "TPU" in group_resources: + return _round_up_k8s_quantity(group_resources["TPU"]) + elif "TPU" in custom_resource_dict: return custom_resource_dict["TPU"] else: for typ in ["limits", "requests"]: @@ -430,17 +468,42 @@ def _round_up_k8s_quantity(quantity: str) -> int: def _get_custom_resources( - ray_start_params: Dict[str, Any], group_name: str + group_resources: Dict[str, str], ray_start_params: Dict[str, Any], group_name: str ) -> Dict[str, int]: - """Format custom resources based on the `resources` Ray start param. + """Format custom resources based on the group `resources` field or `resources` Ray start param. - Currently, the value of the `resources` field must + Currently, the value of the rayStartParam `resources` field must be formatted as follows: '"{\"Custom1\": 1, \"Custom2\": 5}"'. This method first converts the input to a correctly formatted json string and then loads that json string to a dict. """ + # If the top-level `resources` field is defined, use it as the exclusive source. + if group_resources: + if "resources" in ray_start_params: + logger.warning( + f"custom resources specified in both the top-level 'resources' field and in 'rayStartParams'. " + f"Using the values from 'resources': {group_resources}." + ) + standard_keys = {"CPU", "GPU", "TPU", "memory"} + try: + custom_resources = { + k: _round_up_k8s_quantity(v) + for k, v in group_resources.items() + if k not in standard_keys + } + except Exception as e: + logger.error( + f"Error reading `resource` for group {group_name}." + " For the correct format, refer to example configuration at " + "https://github.com/ray-project/ray/blob/master/python/" + "ray/autoscaler/kuberay/ray-cluster.complete.yaml." + ) + raise e + return custom_resources + + # Otherwise, check rayStartParams. if "resources" not in ray_start_params: return {} resources_string = ray_start_params["resources"] diff --git a/python/ray/tests/kuberay/test_autoscaling_config.py b/python/ray/tests/kuberay/test_autoscaling_config.py index 8dc63b4192c3..58db9a8a6b98 100644 --- a/python/ray/tests/kuberay/test_autoscaling_config.py +++ b/python/ray/tests/kuberay/test_autoscaling_config.py @@ -223,6 +223,72 @@ def _get_ray_cr_with_tpu_k8s_resource_limit_and_custom_resource() -> dict: return cr +def _get_ray_cr_with_top_level_labels() -> dict: + """CR with a top-level `labels` field.""" + cr = get_basic_ray_cr() + # This top-level structured labels take priority. + cr["spec"]["workerGroupSpecs"][0]["labels"] = {"instance-type": "mx5"} + + # rayStartParams labels field should be ignored. + cr["spec"]["workerGroupSpecs"][0]["rayStartParams"]["labels"] = "instance-type=n2" + return cr + + +def _get_autoscaling_config_with_top_level_labels() -> dict: + config = _get_basic_autoscaling_config() + config["available_node_types"]["small-group"]["labels"] = {"instance-type": "mx5"} + return config + + +def _get_ray_cr_with_invalid_top_level_labels() -> dict: + """CR with a syntactically invalid top-level `labels` field.""" + cr = get_basic_ray_cr() + cr["spec"]["workerGroupSpecs"][0]["labels"] = {"!!invalid-key!!": "some-value"} + return cr + + +def _get_ray_cr_with_top_level_resources() -> dict: + """CR with a top-level `resources` field to test priority.""" + cr = get_basic_ray_cr() + + # The top-level resources field should take priority. + cr["spec"]["workerGroupSpecs"][1]["resources"] = { + "CPU": "16", + "GPU": "8", + "memory": "2Gi", + "CustomResource": "99", + } + # These rayStartParams should be ignored. + cr["spec"]["workerGroupSpecs"][1]["rayStartParams"]["num-cpus"] = "1" + cr["spec"]["workerGroupSpecs"][1]["rayStartParams"]["memory"] = "100000" + cr["spec"]["workerGroupSpecs"][1]["rayStartParams"]["num-gpus"] = "2" + cr["spec"]["workerGroupSpecs"][1]["rayStartParams"][ + "resources" + ] = '"{"Custom2": 1}"' + return cr + + +def _get_autoscaling_config_with_top_level_resources() -> dict: + config = _get_basic_autoscaling_config() + + config["available_node_types"]["gpu-group"]["resources"] = { + "CPU": 16, + "GPU": 8, + "memory": 2147483648, + "CustomResource": 99, + } + return config + + +def _get_ray_cr_with_top_level_tpu_resource() -> dict: + """CR with a top-level `resources` field for the TPU custom resource.""" + cr = _get_ray_cr_with_tpu_k8s_resource_limit_and_custom_resource() + + # The top-level field should take priority. + cr["spec"]["workerGroupSpecs"][2]["resources"] = {"TPU": "8"} + return cr + + def _get_ray_cr_with_no_tpus() -> dict: cr = get_basic_ray_cr() # remove TPU worker group @@ -404,11 +470,27 @@ def test_resource_quantity(input: str, output: int): ), pytest.param( _get_ray_cr_with_labels(), - _get_autoscaling_config_with_labels(), + _get_basic_autoscaling_config(), None, None, + "Ignoring labels: ray.io/accelerator-type=TPU-V4 set in rayStartParams. Group labels are supported in the top-level Labels field starting in KubeRay v1.5", + id="groups-with-raystartparam-labels", + ), + pytest.param( + _get_ray_cr_with_top_level_labels(), + _get_autoscaling_config_with_top_level_labels(), + None, None, - id="groups-with-labels", + "Ignoring labels: instance-type=n2 set in rayStartParams. Group labels are supported in the top-level Labels field starting in KubeRay v1.5", + id="groups-with-top-level-labels", + ), + pytest.param( + _get_ray_cr_with_invalid_top_level_labels(), + _get_basic_autoscaling_config(), + ValueError, + None, + None, + id="invalid-top-level-labels", ), ] ) @@ -604,6 +686,11 @@ def test_tpu_node_selectors_to_type( 0, id="no-tpus-requested", ), + pytest.param( + _get_ray_cr_with_top_level_tpu_resource(), + 8, + id="tpu-top-level-resource", + ), ] ) @@ -613,13 +700,14 @@ def test_tpu_node_selectors_to_type( def test_get_num_tpus(ray_cr_in: Dict[str, Any], expected_num_tpus: int): """Verify that _get_num_tpus correctly returns the number of requested TPUs.""" for worker_group in ray_cr_in["spec"]["workerGroupSpecs"]: + group_resources = worker_group.get("resources", {}) ray_start_params = worker_group["rayStartParams"] custom_resources = _get_custom_resources( - ray_start_params, worker_group["groupName"] + group_resources, ray_start_params, worker_group["groupName"] ) k8s_resources = worker_group["template"]["spec"]["containers"][0]["resources"] - num_tpus = _get_num_tpus(custom_resources, k8s_resources) + num_tpus = _get_num_tpus(group_resources, custom_resources, k8s_resources) if worker_group["groupName"] == "tpu-group": assert num_tpus == expected_num_tpus @@ -735,5 +823,41 @@ def test_get_ray_resources_from_group_spec( assert _get_ray_resources_from_group_spec(group_spec, is_head) == expected_resources +@pytest.mark.skipif(platform.system() == "Windows", reason="Not relevant.") +def test_top_level_resources_override_warnings(): + """ + Verify all override warnings are logged when a top-level `resources` field is used in + addition to specifying those resources in the rayStartParams. + """ + ray_cr_in = _get_ray_cr_with_top_level_resources() + ray_cr_in["metadata"]["namespace"] = "default" + + with mock.patch(f"{AUTOSCALING_CONFIG_MODULE_PATH}.logger") as mock_logger: + _derive_autoscaling_config_from_ray_cr(ray_cr_in) + + expected_calls = [ + mock.call( + "'CPU' specified in both the top-level 'resources' field and in 'rayStartParams'. " + "Using the value from 'resources': 16." + ), + mock.call( + "'GPU' specified in both the top-level 'resources' field and in 'rayStartParams'. " + "Using the value from 'resources': 8." + ), + mock.call( + "'memory' specified in both the top-level 'resources' field and in 'rayStartParams'. " + "Using the value from 'resources': 2Gi." + ), + mock.call( + "custom resources specified in both the top-level 'resources' field and in 'rayStartParams'. " + "Using the values from 'resources': {'CPU': '16', 'GPU': '8', 'memory': '2Gi', 'CustomResource': '99'}." + ), + ] + + # Assert that all expected calls were made, in any order. + mock_logger.warning.assert_has_calls(expected_calls, any_order=True) + assert mock_logger.warning.call_count == 4 + + if __name__ == "__main__": sys.exit(pytest.main(["-v", __file__]))