diff --git a/ci/lint/pydoclint-baseline.txt b/ci/lint/pydoclint-baseline.txt index 30c316914ba9..d115260f634d 100644 --- a/ci/lint/pydoclint-baseline.txt +++ b/ci/lint/pydoclint-baseline.txt @@ -263,22 +263,6 @@ python/ray/_private/test_utils.py DOC101: Function `monitor_memory_usage`: Docstring contains fewer arguments than in function signature. DOC103: Function `monitor_memory_usage`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [print_interval_s: int, record_interval_s: int]. Arguments in the docstring but not in the function signature: [interval_s: ]. -------------------- -python/ray/_private/usage/usage_lib.py - DOC201: Function `record_extra_usage_tag` does not have a return section in docstring - DOC201: Function `_generate_cluster_metadata` does not have a return section in docstring - DOC106: Function `put_cluster_metadata`: The option `--arg-type-hints-in-signature` is `True` but there are no argument type hints in the signature - DOC107: Function `put_cluster_metadata`: The option `--arg-type-hints-in-signature` is `True` but not all args in the signature have type hints - DOC101: Function `get_extra_usage_tags_to_report`: Docstring contains fewer arguments than in function signature. - DOC106: Function `get_extra_usage_tags_to_report`: The option `--arg-type-hints-in-signature` is `True` but there are no argument type hints in the signature - DOC107: Function `get_extra_usage_tags_to_report`: The option `--arg-type-hints-in-signature` is `True` but not all args in the signature have type hints - DOC103: Function `get_extra_usage_tags_to_report`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [gcs_client: ]. - DOC106: Function `_get_cluster_status_to_report_v2`: The option `--arg-type-hints-in-signature` is `True` but there are no argument type hints in the signature - DOC107: Function `_get_cluster_status_to_report_v2`: The option `--arg-type-hints-in-signature` is `True` but not all args in the signature have type hints - DOC106: Function `get_cluster_status_to_report`: The option `--arg-type-hints-in-signature` is `True` but there are no argument type hints in the signature - DOC107: Function `get_cluster_status_to_report`: The option `--arg-type-hints-in-signature` is `True` but not all args in the signature have type hints - DOC106: Function `get_cluster_metadata`: The option `--arg-type-hints-in-signature` is `True` but there are no argument type hints in the signature - DOC107: Function `get_cluster_metadata`: The option `--arg-type-hints-in-signature` is `True` but not all args in the signature have type hints --------------------- python/ray/_private/utils.py DOC101: Function `format_error_message`: Docstring contains fewer arguments than in function signature. DOC103: Function `format_error_message`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [task_exception: bool]. @@ -766,8 +750,6 @@ python/ray/autoscaler/v2/utils.py DOC107: Method `ProtobufUtil.to_dict`: The option `--arg-type-hints-in-signature` is `True` but not all args in the signature have type hints DOC106: Method `ProtobufUtil.to_dict_list`: The option `--arg-type-hints-in-signature` is `True` but there are no argument type hints in the signature DOC107: Method `ProtobufUtil.to_dict_list`: The option `--arg-type-hints-in-signature` is `True` but not all args in the signature have type hints - DOC101: Method `ResourceRequestUtil.make`: Docstring contains fewer arguments than in function signature. - DOC103: Method `ResourceRequestUtil.make`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [constraints: Optional[List[Tuple[PlacementConstraintType, str, str]]]]. DOC103: Method `ClusterStatusFormatter._constraint_report`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [cluster_constraint_demand: List[ClusterConstraintDemand]]. Arguments in the docstring but not in the function signature: [data: ]. -------------------- python/ray/client_builder.py @@ -1571,11 +1553,6 @@ python/ray/llm/_internal/common/utils/download_utils.py python/ray/llm/_internal/serve/configs/openai_api_models.py DOC201: Function `to_model_metadata` does not have a return section in docstring -------------------- -python/ray/llm/_internal/serve/deployments/llm/multiplex/utils.py - DOC201: Function `retry_with_exponential_backoff` does not have a return section in docstring - DOC101: Function `get_object_from_cloud`: Docstring contains fewer arguments than in function signature. - DOC103: Function `get_object_from_cloud`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [object_uri: str]. --------------------- python/ray/llm/_internal/serve/deployments/routers/router.py DOC101: Method `LLMRouter.completions`: Docstring contains fewer arguments than in function signature. DOC103: Method `LLMRouter.completions`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [body: CompletionRequest]. diff --git a/python/ray/autoscaler/v2/event_logger.py b/python/ray/autoscaler/v2/event_logger.py index 961dc37bb0ad..316378867e27 100644 --- a/python/ray/autoscaler/v2/event_logger.py +++ b/python/ray/autoscaler/v2/event_logger.py @@ -11,6 +11,7 @@ GangResourceRequest, ResourceRequest, ) +from ray.core.generated.common_pb2 import LabelSelectorOperator from ray.core.generated.instance_manager_pb2 import LaunchRequest, TerminationRequest logger = logging.getLogger(__name__) @@ -105,6 +106,21 @@ def log_cluster_scheduling_update( if idx < len(requests_by_count) - 1: log_str += ", " + # Parse and log label selectors if present + if req_count.request.label_selectors: + selector_strs = [] + for selector in req_count.request.label_selectors: + for constraint in selector.label_constraints: + op = LabelSelectorOperator.Name(constraint.operator) + values = ",".join(constraint.label_values) + selector_strs.append( + f"{constraint.label_key} {op} [{values}]" + ) + if selector_strs: + log_str += ( + " with label selectors: [" + "; ".join(selector_strs) + "]" + ) + log_str += ( ". Add suitable node types to this cluster to resolve this issue." ) diff --git a/python/ray/autoscaler/v2/scheduler.py b/python/ray/autoscaler/v2/scheduler.py index 369b2af8570d..641baa2c81a9 100644 --- a/python/ray/autoscaler/v2/scheduler.py +++ b/python/ray/autoscaler/v2/scheduler.py @@ -20,6 +20,7 @@ from ray.autoscaler.v2.instance_manager.config import NodeTypeConfig from ray.autoscaler.v2.schema import AutoscalerInstance, NodeType from ray.autoscaler.v2.utils import ProtobufUtil, ResourceRequestUtil +from ray.core.generated.common_pb2 import LabelSelectorOperator from ray.core.generated.autoscaler_pb2 import ( ClusterResourceConstraint, GangResourceRequest, @@ -275,8 +276,10 @@ def new( # Available resources for scheduling requests of different # sources. available_resources=dict(instance.ray_node.available_resources), - # Use ray node's dynamic labels. - labels=dict(instance.ray_node.dynamic_labels), + labels={ + **(instance.ray_node.labels or {}), + **(instance.ray_node.dynamic_labels or {}), + }, status=SchedulingNodeStatus.SCHEDULABLE, im_instance_id=instance.im_instance.instance_id, im_instance_status=instance.im_instance.status, @@ -437,16 +440,22 @@ def _compute_score( A "higher" score means that this node is more suitable for scheduling the current scheduled resource requests. - The score is a tuple of 4 values: - 1. Whether this node is a GPU node and the current resource request has + The score is a tuple of 5 values: + 1. Whether this node has labels matching the current resource request's + label_selector requirements: + 0: if this node does not satisfy any label selector requirements or + no label selectors are provided. + len(label_selectors)-i: a score based on the priority of the label + selector in the resource request that this node satisfies. + 2. Whether this node is a GPU node and the current resource request has GPU requirements: 0: if this node is a GPU node and the current resource request placed onto the node has no GPU requirements. 1: if this node is not a GPU node or the current resource request placed onto the node has GPU requirements. - 2. The number of resource types being scheduled. - 3. The minimum utilization rate across all resource types. - 4. The average utilization rate across all resource types. + 3. The number of resource types being scheduled. + 4. The minimum utilization rate across all resource types. + 5. The average utilization rate across all resource types. NOTE: This function is adapted from _resource_based_utilization_scorer from @@ -499,11 +508,15 @@ def _compute_score( if is_gpu_node and not any_gpu_requests: gpu_ok = False + # Check if node satisfies label requirements. + matches_labels = self._satisfies_label_constraints(sched_requests) + # Prioritize avoiding gpu nodes for non-gpu workloads first, # then prioritize matching multiple resource types, # then prioritize using all resources, # then prioritize overall balance of multiple resources. return ( + matches_labels, gpu_ok, num_matching_resource_types, min(util_by_resources) if util_by_resources else 0, @@ -512,6 +525,37 @@ def _compute_score( else 0, ) + def _satisfies_label_constraints( + self, sched_requests: List[ResourceRequest] + ) -> int: + """Returns a higher value based on the priority of the label selector this node + satisfies (first returns highest score, decreasing sequentially for fallback), 0 otherwise.""" + for req in sched_requests: + num_selectors = len(req.label_selectors) + for i, selector in enumerate(req.label_selectors): + all_constraints_pass = True + for constraint in selector.label_constraints: + key = constraint.label_key + values = set(constraint.label_values) + op = constraint.operator + node_val = self.labels.get(key) + + if op == LabelSelectorOperator.LABEL_OPERATOR_IN: + if node_val not in values: + all_constraints_pass = False + break + elif op == LabelSelectorOperator.LABEL_OPERATOR_NOT_IN: + if node_val in values: + all_constraints_pass = False + break + else: + all_constraints_pass = False + break + + if all_constraints_pass: + return num_selectors - i + return 0 + def _try_schedule_one( self, request: ResourceRequest, resource_request_source: ResourceRequestSource ) -> bool: @@ -528,6 +572,11 @@ def _try_schedule_one( True if the resource request is scheduled on this node. """ + # Enforce label selector constraints + if request.label_selectors: + if self._satisfies_label_constraints([request]) == 0: + return False # Node doesn't satisfy any label selector in request. + # Check if there's placement constraints that are not satisfied. for constraint in request.placement_constraints: if constraint.HasField("anti_affinity"): @@ -1347,17 +1396,24 @@ def _try_schedule( def _sort_resource_request(req: ResourceRequest) -> Tuple: """ Sort the resource requests by: - 1. The length of it's placement constraints. - 2. The number of resources it requests. - 3. The values of resources it requests. - 4. lexicographically for each resource (for stable ordering) + 1. The length of its placement constraints. + 2. The length of its first label selector constraints (if any). + 3. The number of resources it requests. + 4. The values of resources it requests. + 5. lexicographically for each resource (for stable ordering) This is a legacy sorting function for the autoscaler's binpacking algo - we do this so that we could have a deterministic scheduling results with reasonable fragmentation. """ + label_constraint_len = ( + len(req.label_selectors[0].label_constraints) + if req.label_selectors + else 0 + ) return ( len(req.placement_constraints), + label_constraint_len, len(req.resources_bundle.values()), sum(req.resources_bundle.values()), sorted(req.resources_bundle.items()), diff --git a/python/ray/autoscaler/v2/schema.py b/python/ray/autoscaler/v2/schema.py index 76eda2ec57c5..47c722c87cba 100644 --- a/python/ray/autoscaler/v2/schema.py +++ b/python/ray/autoscaler/v2/schema.py @@ -58,6 +58,8 @@ class NodeInfo: details: Optional[str] = None # Activity on the node. node_activity: Optional[List[str]] = None + # Ray node labels. + labels: Optional[Dict[str, str]] = None def total_resources(self) -> Dict[str, float]: if self.resource_usage is None: diff --git a/python/ray/autoscaler/v2/tests/test_e2e.py b/python/ray/autoscaler/v2/tests/test_e2e.py index 870be60a3754..4283af09a091 100644 --- a/python/ray/autoscaler/v2/tests/test_e2e.py +++ b/python/ray/autoscaler/v2/tests/test_e2e.py @@ -526,6 +526,107 @@ def nodes_up(): cluster.shutdown() +@pytest.mark.parametrize("autoscaler_v2", [True]) +def test_task_scheduled_on_node_with_label_selector(autoscaler_v2): + cluster = AutoscalingCluster( + head_resources={"CPU": 0}, + worker_node_types={ + "node1": { + "resources": {"CPU": 1}, + "node_config": {}, + "labels": {"accelerator-type": "A100", "market-type": "spot"}, + "min_workers": 0, + "max_workers": 1, + }, + "node2": { + "resources": {"CPU": 1}, + "node_config": {}, + "labels": { + "region": "us-east1", + "accelerator-type": "TPU", + "market-type": "spot", + }, + "min_workers": 0, + "max_workers": 1, + }, + "node3": { + "resources": {"CPU": 1}, + "node_config": {}, + "labels": {"accelerator-type": "B200", "market-type": "spot"}, + "min_workers": 0, + "max_workers": 1, + }, + "node4": { + "resources": {"CPU": 1}, + "node_config": {}, + "labels": {"market-type": "on-demand", "accelerator-type": "TPU"}, + "min_workers": 0, + "max_workers": 1, + }, + }, + idle_timeout_minutes=999, + autoscaler_v2=autoscaler_v2, + ) + + driver_script = """ +import ray +import time + +@ray.remote(num_cpus=1, label_selector={"accelerator-type": "A100"}) +def task1(): + print("Running task1") + time.sleep(60) + return True + +@ray.remote(num_cpus=1, label_selector={"region": "in(us-east1,me-central1)"}) +def task2(): + print("Running task2") + time.sleep(60) + return True + +@ray.remote(num_cpus=1, label_selector={"accelerator-type": "!in(A100,TPU)"}) +def task3(): + print("Running task3") + time.sleep(60) + return True + +@ray.remote(num_cpus=1, label_selector={"market-type": "!in(spot)"}) +def task4(): + print("Running task4") + time.sleep(60) + return True + +ray.init("auto") +assert(ray.get([task1.remote(), task2.remote(), task3.remote(), task4.remote()])) +""" + + try: + cluster.start() + ray.init("auto") + gcs_address = ray.get_runtime_context().gcs_address + expected_nodes = 4 + + def tasks_run(): + tasks = list_tasks() + assert len(tasks) > 0 + return True + + run_string_as_driver_nonblocking(driver_script) + wait_for_condition(tasks_run) + + def all_tasks_scheduled(): + status = get_cluster_status(gcs_address) + return len(status.active_nodes) == expected_nodes + + # All tasks with label selectors should be scheduled, scaling + # 4 nodes with the required labels. + wait_for_condition(all_tasks_scheduled, timeout=60) + + finally: + ray.shutdown() + cluster.shutdown() + + if __name__ == "__main__": if os.environ.get("PARALLEL_CI"): sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__])) diff --git a/python/ray/autoscaler/v2/tests/test_scheduler.py b/python/ray/autoscaler/v2/tests/test_scheduler.py index 848d153954ad..1f95f83df3b2 100644 --- a/python/ray/autoscaler/v2/tests/test_scheduler.py +++ b/python/ray/autoscaler/v2/tests/test_scheduler.py @@ -22,6 +22,7 @@ from ray.autoscaler.v2.schema import AutoscalerInstance, NodeType from ray.autoscaler.v2.tests.util import MockEventLogger, make_autoscaler_instance from ray.autoscaler.v2.utils import ResourceRequestUtil +from ray.core.generated.common_pb2 import LabelSelectorOperator from ray.core.generated.autoscaler_pb2 import ( ClusterResourceConstraint, GangResourceRequest, @@ -1930,64 +1931,138 @@ def try_schedule(node_resources: Dict, requests: List[Dict]) -> Tuple: infeasible, score = node.try_schedule(requests, source) return ResourceRequestUtil.to_resource_maps(infeasible), score - assert try_schedule({"CPU": 1}, [{"CPU": 1}]) == ([], (True, 1, 1.0, 1.0)) + assert try_schedule({"CPU": 1}, [{"CPU": 1}]) == ([], (0, True, 1, 1.0, 1.0)) - assert try_schedule({"GPU": 4}, [{"GPU": 2}]) == ([], (True, 1, 0.5, 0.5)) + assert try_schedule({"GPU": 4}, [{"GPU": 2}]) == ([], (0, True, 1, 0.5, 0.5)) assert try_schedule({"GPU": 4}, [{"GPU": 1}, {"GPU": 1}]) == ( [], - (True, 1, 0.5, 0.5), + (0, True, 1, 0.5, 0.5), + ) + assert try_schedule({"GPU": 2}, [{"GPU": 2}]) == ([], (0, True, 1, 2, 2)) + assert try_schedule({"GPU": 2}, [{"GPU": 1}, {"GPU": 1}]) == ( + [], + (0, True, 1, 2, 2), ) - assert try_schedule({"GPU": 2}, [{"GPU": 2}]) == ([], (True, 1, 2, 2)) - assert try_schedule({"GPU": 2}, [{"GPU": 1}, {"GPU": 1}]) == ([], (True, 1, 2, 2)) assert try_schedule({"GPU": 1}, [{"GPU": 1, "CPU": 1}, {"GPU": 1}]) == ( [{"GPU": 1, "CPU": 1}], - (True, 1, 1, 1), + (0, True, 1, 1, 1), ) assert try_schedule({"GPU": 1, "CPU": 1}, [{"GPU": 1, "CPU": 1}, {"GPU": 1}]) == ( [{"GPU": 1}], - (True, 2, 1, 1), + (0, True, 2, 1, 1), ) - assert try_schedule({"GPU": 2, "TPU": 1}, [{"GPU": 2}]) == ([], (True, 1, 0, 1)) - assert try_schedule({"CPU": 64}, [{"CPU": 64}]) == ([], (True, 1, 64, 64)) - assert try_schedule({"CPU": 64}, [{"CPU": 32}]) == ([], (True, 1, 8, 8)) + assert try_schedule({"GPU": 2, "TPU": 1}, [{"GPU": 2}]) == ([], (0, True, 1, 0, 1)) + assert try_schedule({"CPU": 64}, [{"CPU": 64}]) == ([], (0, True, 1, 64, 64)) + assert try_schedule({"CPU": 64}, [{"CPU": 32}]) == ([], (0, True, 1, 8, 8)) assert try_schedule({"CPU": 64}, [{"CPU": 16}, {"CPU": 16}]) == ( [], - (True, 1, 8, 8), + (0, True, 1, 8, 8), ) # GPU Scores assert try_schedule({"GPU": 1, "CPU": 1}, [{"CPU": 1}]) == ( [], - (False, 1, 0.0, 0.5), + (0, False, 1, 0.0, 0.5), ) assert try_schedule({"GPU": 1, "CPU": 1}, [{"CPU": 1, "GPU": 1}]) == ( [], - (True, 2, 1.0, 1.0), + (0, True, 2, 1.0, 1.0), ) assert try_schedule({"GPU": 1, "CPU": 1}, [{"GPU": 1}]) == ( [], - (True, 1, 0.0, 0.5), + (0, True, 1, 0.0, 0.5), ) # Zero resources assert try_schedule({"CPU": 0, "custom": 1}, [{"custom": 1}]) == ( [], - (True, 1, 1, 1), + (0, True, 1, 1, 1), ) assert try_schedule({"CPU": 0, "custom": 1}, [{"CPU": 1}]) == ( [{"CPU": 1}], - (True, 0, 0.0, 0.0), + (0, True, 0, 0.0, 0.0), ) # Implicit resources implicit_resource = ray._raylet.IMPLICIT_RESOURCE_PREFIX + "a" assert try_schedule({"CPU": 1}, [{implicit_resource: 1}]) == ( [], - (True, 0, 0.0, 0.0), + (0, True, 0, 0.0, 0.0), ) assert try_schedule({"CPU": 1}, [{implicit_resource: 1}] * 2) == ( [{implicit_resource: 1}], - (True, 0, 0.0, 0.0), + (0, True, 0, 0.0, 0.0), + ) + + +@pytest.mark.parametrize( + "source", + [ + ResourceRequestSource.PENDING_DEMAND, + ResourceRequestSource.CLUSTER_RESOURCE_CONSTRAINT, + ], + ids=["demand", "cluster_resource_constraint"], +) +def test_node_schedule_label_selector_score(source): + def try_schedule_ls( + node_resources: Dict, + node_labels: Dict[str, str], + selectors, + ) -> Tuple: + cfg = NodeTypeConfig( + name="type_1", + resources=node_resources, + min_worker_nodes=0, + max_worker_nodes=1, + labels=node_labels, + ) + node = SchedulingNode.from_node_config( + node_config=cfg, + status=SchedulingNodeStatus.SCHEDULABLE, + node_kind=NodeKind.WORKER, + ) + req = ResourceRequestUtil.make({"CPU": 1}, label_selectors=selectors) + infeasible, score = node.try_schedule([req], source) + return ResourceRequestUtil.to_resource_maps(infeasible), score + + labels = {"ray.io/accelerator-type": "A100"} + + # 1) A matching label selector should be schedulable on node type_1 + label_selector_1 = [ + [ + ( + "ray.io/accelerator-type", + LabelSelectorOperator.LABEL_OPERATOR_IN, + ["TPU-v6e"], + ) + ], + [ + ( + "ray.io/accelerator-type", + LabelSelectorOperator.LABEL_OPERATOR_IN, + ["B200"], + ) + ], + [ + ( + "ray.io/accelerator-type", + LabelSelectorOperator.LABEL_OPERATOR_IN, + ["A100"], + ) + ], + ] + assert try_schedule_ls({"CPU": 1}, labels, label_selector_1) == ( + [], + (1, True, 1, 1.0, 1.0), + ) + + # 2) A non‑matching label selector should be infeasible + label_selector_2 = [ + [("ray.io/accelerator-type", LabelSelectorOperator.LABEL_OPERATOR_IN, ["B200"])] + ] + assert try_schedule_ls({"CPU": 1}, labels, label_selector_2) == ( + [{"CPU": 1.0}], + (0, True, 0, 0.0, 0.0), ) @@ -2341,6 +2416,228 @@ def get_nodes_for(gang_resource_requests) -> Tuple[Dict, List[List[Dict]]]: ) == ({"p2.8xlarge": 1}, []) +def test_schedule_node_with_matching_labels(): + """ + Test that a node with matching labels is considered schedulable and used to satisfy a request + with a label_selector. + """ + scheduler = ResourceDemandScheduler(event_logger) + node_type_configs = { + "labelled_node": NodeTypeConfig( + name="labelled_node", + resources={"CPU": 1}, + min_worker_nodes=0, + max_worker_nodes=10, + labels={"accelerator": "A100"}, + ), + } + + # The existing instance has matching dynamic label. + instance = make_autoscaler_instance( + im_instance=Instance( + instance_type="labelled_node", + status=Instance.RAY_RUNNING, + instance_id="1", + node_id=b"r-1", + ), + ray_node=NodeState( + node_id=b"r-1", + ray_node_type_name="labelled_node", + available_resources={"CPU": 1}, + total_resources={"CPU": 1}, + labels={"accelerator": "A100"}, + status=NodeStatus.RUNNING, + ), + cloud_instance_id="c-1", + ) + + # No new nodes should be launched if the existing node satisfies the request. + resource_request = ResourceRequestUtil.make( + {"CPU": 1}, + label_selectors=[ + [("accelerator", LabelSelectorOperator.LABEL_OPERATOR_IN, ["A100"])] + ], + ) + + request = sched_request( + node_type_configs=node_type_configs, + resource_requests=[resource_request], + instances=[instance], + ) + reply = scheduler.schedule(request) + to_launch, _ = _launch_and_terminate(reply) + assert to_launch == {} + + +def test_scale_up_node_to_satisfy_labels(): + """ + Test that a resource request with a label selector scales up a new node with + labels to satisfy the constraint. + """ + scheduler = ResourceDemandScheduler(event_logger) + + node_type_configs = { + "tpu_node": NodeTypeConfig( + name="tpu_node", + resources={"CPU": 1}, + labels={"accelerator": "TPU"}, + min_worker_nodes=0, + max_worker_nodes=10, + ), + "gpu_node": NodeTypeConfig( + name="gpu_node", + resources={"CPU": 1}, + labels={"accelerator": "A100"}, + min_worker_nodes=0, + max_worker_nodes=10, + ), + } + + # Request: want a node with label "accelerator: A100" + resource_request = ResourceRequestUtil.make( + {"CPU": 1}, + label_selectors=[ + [("accelerator", LabelSelectorOperator.LABEL_OPERATOR_IN, ["A100"])] + ], + ) + + request = sched_request( + node_type_configs=node_type_configs, + resource_requests=[resource_request], + ) + + reply = scheduler.schedule(request) + to_launch, _ = _launch_and_terminate(reply) + + assert to_launch == {"gpu_node": 1} + + +def test_label_selector_fallback_priority(): + """ + Test that a resource request with multiple label selectors scales up + the expected node given its fallback priority (i.e. earlier selectors are + satisfied first). + """ + scheduler = ResourceDemandScheduler(event_logger) + + node_type_configs = { + "tpu_node": NodeTypeConfig( + name="tpu_node", + resources={"CPU": 1}, + labels={"accelerator-type": "TPU"}, + min_worker_nodes=0, + max_worker_nodes=10, + ), + "gpu_node": NodeTypeConfig( + name="gpu_node", + resources={"CPU": 1}, + labels={"accelerator-type": "A100"}, + min_worker_nodes=0, + max_worker_nodes=10, + ), + } + + # 1). TPU node is scaled up to satisfy first label selector. + req1 = ResourceRequestUtil.make( + {"CPU": 1}, + label_selectors=[ + [("accelerator-type", LabelSelectorOperator.LABEL_OPERATOR_IN, ["TPU"])], + [("accelerator-type", LabelSelectorOperator.LABEL_OPERATOR_IN, ["A100"])], + ], + ) + reply1 = scheduler.schedule( + sched_request(node_type_configs=node_type_configs, resource_requests=[req1]) + ) + to_launch1, _ = _launch_and_terminate(reply1) + assert to_launch1 == {"tpu_node": 1} + + # 1). Label selector falls back to second priority and scales up A100 node. + req2 = ResourceRequestUtil.make( + {"CPU": 1}, + label_selectors=[ + # infeasible + [("accelerator-type", LabelSelectorOperator.LABEL_OPERATOR_IN, ["B200"])], + [("accelerator-type", LabelSelectorOperator.LABEL_OPERATOR_IN, ["A100"])], + ], + ) + reply2 = scheduler.schedule( + sched_request(node_type_configs=node_type_configs, resource_requests=[req2]) + ) + to_launch2, _ = _launch_and_terminate(reply2) + assert to_launch2 == {"gpu_node": 1} + + +def test_pg_with_bundle_infeasible_label_selectors(): + """ + Test that placement group scheduling honors bundle_label_selectors. + """ + scheduler = ResourceDemandScheduler(event_logger) + AFFINITY = ResourceRequestUtil.PlacementConstraintType.AFFINITY + + node_type_configs = { + "gpu_node": NodeTypeConfig( + name="gpu_node", + resources={"CPU": 4, "GPU": 1}, + min_worker_nodes=0, + max_worker_nodes=5, + labels={"accelerator": "A100"}, + ), + "tpu_node": NodeTypeConfig( + name="tpu_node", + resources={"CPU": 4}, + min_worker_nodes=0, + max_worker_nodes=5, + labels={"accelerator": "TPU"}, + ), + } + + # Create ResourceRequests for a placement group where each bundle has different label selectors + gpu_request = ResourceRequestUtil.make( + {"CPU": 2, "GPU": 1}, + constraints=[(AFFINITY, "pg-1", "")], + label_selectors=[ + [("accelerator", LabelSelectorOperator.LABEL_OPERATOR_IN, ["A100"])] + ], + ) + tpu_request = ResourceRequestUtil.make( + {"CPU": 2}, + constraints=[(AFFINITY, "pg-1", "")], + label_selectors=[ + [("accelerator", LabelSelectorOperator.LABEL_OPERATOR_IN, ["TPU"])] + ], + ) + + request = sched_request( + node_type_configs=node_type_configs, + gang_resource_requests=[[gpu_request, tpu_request]], + ) + + reply = scheduler.schedule(request) + to_launch, _ = _launch_and_terminate(reply) + + assert sorted(to_launch) == sorted({"gpu_node": 1, "tpu_node": 1}) + + # Both bundles require A100, but no node has enough resources -> infeasible + infeasbile_gpu_request = ResourceRequestUtil.make( + {"CPU": 3, "GPU": 1}, + constraints=[(AFFINITY, "pg-2", "")], + label_selectors=[ + [("accelerator", LabelSelectorOperator.LABEL_OPERATOR_IN, ["A100"])] + ], + ) + + request = sched_request( + node_type_configs=node_type_configs, + gang_resource_requests=[[infeasbile_gpu_request, infeasbile_gpu_request]], + ) + + reply = scheduler.schedule(request) + to_launch, _ = _launch_and_terminate(reply) + + assert to_launch == {} + assert len(reply.infeasible_gang_resource_requests) == 1 + + if __name__ == "__main__": if os.environ.get("PARALLEL_CI"): sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__])) diff --git a/python/ray/autoscaler/v2/tests/test_sdk.py b/python/ray/autoscaler/v2/tests/test_sdk.py index 244154e84277..1101951b5245 100644 --- a/python/ray/autoscaler/v2/tests/test_sdk.py +++ b/python/ray/autoscaler/v2/tests/test_sdk.py @@ -357,7 +357,9 @@ def verify(): state, [ ExpectedNodeState( - head_node_id, NodeStatus.RUNNING, labels={f"_PG_{pg_id}": ""} + head_node_id, + NodeStatus.RUNNING, + labels={f"_PG_{pg_id}": ""}, ), ], ) diff --git a/python/ray/autoscaler/v2/utils.py b/python/ray/autoscaler/v2/utils.py index 8d86b024a545..d3128e961c63 100644 --- a/python/ray/autoscaler/v2/utils.py +++ b/python/ray/autoscaler/v2/utils.py @@ -43,6 +43,10 @@ from ray.core.generated.autoscaler_pb2 import ( ResourceRequestByCount as ResourceRequestByCountProto, ) +from ray.core.generated.common_pb2 import ( + LabelSelectorConstraint, + LabelSelector, +) from ray.experimental.internal_kv import internal_kv_get_gcs_client @@ -188,41 +192,63 @@ def to_resource_maps( def make( resources_map: Dict[str, float], constraints: Optional[List[Tuple[PlacementConstraintType, str, str]]] = None, + label_selectors: Optional[List[List[Tuple[str, int, List[str]]]]] = None, ) -> ResourceRequest: """ Make a resource request from the given resources map. Args: - resources_map: the resources map + resources_map: Mapping of resource names to quantities. + constraints: Placement constraints. Each tuple is (constraint_type, + label_key, label_value), where `constraint_type` is a + PlacementConstraintType (AFFINITY or ANTI_AFFINITY). + label_selectors: Optional list of label selectors. Each selector is + a list of (label_key, operator_enum, label_values) tuples. Returns: - request: the resource request + request: the ResourceRequest object """ request = ResourceRequest() for resource_name, quantity in resources_map.items(): request.resources_bundle[resource_name] = quantity - if constraints is None: - return request - - for constraint_type, label, value in constraints: - if constraint_type == ResourceRequestUtil.PlacementConstraintType.AFFINITY: - request.placement_constraints.append( - PlacementConstraint( - affinity=AffinityConstraint(label_name=label, label_value=value) + if constraints is not None: + for constraint_type, label, value in constraints: + if ( + constraint_type + == ResourceRequestUtil.PlacementConstraintType.AFFINITY + ): + request.placement_constraints.append( + PlacementConstraint( + affinity=AffinityConstraint( + label_name=label, label_value=value + ) + ) ) - ) - elif ( - constraint_type - == ResourceRequestUtil.PlacementConstraintType.ANTI_AFFINITY - ): - request.placement_constraints.append( - PlacementConstraint( - anti_affinity=AntiAffinityConstraint( - label_name=label, label_value=value + elif ( + constraint_type + == ResourceRequestUtil.PlacementConstraintType.ANTI_AFFINITY + ): + request.placement_constraints.append( + PlacementConstraint( + anti_affinity=AntiAffinityConstraint( + label_name=label, label_value=value + ) ) ) - ) - else: - raise ValueError(f"Unknown constraint type: {constraint_type}") + else: + raise ValueError(f"Unknown constraint type: {constraint_type}") + + if label_selectors is not None: + for selector in label_selectors: + selector_proto = LabelSelector() + for label_key, operator_enum, label_values in selector: + selector_proto.label_constraints.append( + LabelSelectorConstraint( + label_key=label_key, + operator=operator_enum, + label_values=label_values, + ) + ) + request.label_selectors.append(selector_proto) return request @@ -250,7 +276,7 @@ def combine_requests_with_affinity( # Map of set of serialized affinity constraint to the list of resource requests requests_by_affinity: Dict[ - Tuple[str, str], List[ResourceRequest] + Tuple[str, str, Tuple], List[ResourceRequest] ] = defaultdict(list) combined_requests: List[ResourceRequest] = [] @@ -268,10 +294,14 @@ def combine_requests_with_affinity( constraint = request.placement_constraints[0] if constraint.HasField("affinity"): + # Combine requests with affinity and label selectors. affinity = constraint.affinity - requests_by_affinity[ - (affinity.label_name, affinity.label_value) - ].append(request) + key = ( + affinity.label_name, + affinity.label_value, + ResourceRequestUtil._label_selector_key(request.label_selectors), + ) + requests_by_affinity[key].append(request) elif constraint.HasField("anti_affinity"): # We don't need to combine requests with anti-affinity constraints. combined_requests.append(request) @@ -279,6 +309,7 @@ def combine_requests_with_affinity( for ( affinity_label_name, affinity_label_value, + label_selector_key, ), requests in requests_by_affinity.items(): combined_request = ResourceRequest() @@ -297,10 +328,33 @@ def combine_requests_with_affinity( PlacementConstraint(affinity=affinity_constraint) ) + combined_request.label_selectors.extend(requests[0].label_selectors) + combined_requests.append(combined_request) return combined_requests + def _label_selector_key( + label_selectors: List[LabelSelector], + ) -> Tuple: + """ + Convert label selectors into a hashable form for grouping. + This is used for gang requests with identical label_selectors. + """ + result = [] + for selector in label_selectors: + constraints = [] + for constraint in selector.label_constraints: + constraints.append( + ( + constraint.label_key, + constraint.operator, + tuple(sorted(constraint.label_values)), + ) + ) + result.append(tuple(constraints)) + return tuple(result) + class ClusterStatusFormatter: """ @@ -891,6 +945,7 @@ def _parse_nodes( resource_usage=node_resource_usage, failure_detail=failure_detail, node_activity=node_state.node_activity, + labels=dict(node_state.labels), ) if node_state.status == NodeStatus.DEAD: diff --git a/src/ray/common/scheduling/label_selector.cc b/src/ray/common/scheduling/label_selector.cc index 0b315a51303e..1f30175a4636 100644 --- a/src/ray/common/scheduling/label_selector.cc +++ b/src/ray/common/scheduling/label_selector.cc @@ -32,6 +32,20 @@ LabelSelector::LabelSelector( } } +rpc::LabelSelector LabelSelector::ToProto() const { + rpc::LabelSelector result; + for (const auto &constraint : constraints_) { + auto *proto_constraint = result.add_label_constraints(); + proto_constraint->set_label_key(constraint.GetLabelKey()); + proto_constraint->set_operator_( + static_cast(constraint.GetOperator())); + for (const auto &val : constraint.GetLabelValues()) { + proto_constraint->add_label_values(val); + } + } + return result; +} + void LabelSelector::AddConstraint(const std::string &key, const std::string &value) { auto [op, values] = ParseLabelSelectorValue(key, value); LabelConstraint constraint(key, op, values); diff --git a/src/ray/common/scheduling/label_selector.h b/src/ray/common/scheduling/label_selector.h index c2b59a507c27..4f96e5cfc1f8 100644 --- a/src/ray/common/scheduling/label_selector.h +++ b/src/ray/common/scheduling/label_selector.h @@ -19,6 +19,7 @@ #include "absl/container/flat_hash_set.h" #include "google/protobuf/map.h" +#include "src/ray/protobuf/common.pb.h" namespace ray { @@ -60,6 +61,8 @@ class LabelSelector { explicit LabelSelector( const google::protobuf::Map &label_selector); + rpc::LabelSelector ToProto() const; + void AddConstraint(const std::string &key, const std::string &value); void AddConstraint(LabelConstraint constraint) { @@ -75,4 +78,28 @@ class LabelSelector { std::vector constraints_; }; +inline bool operator==(const LabelConstraint &lhs, const LabelConstraint &rhs) { + return lhs.GetLabelKey() == rhs.GetLabelKey() && + lhs.GetOperator() == rhs.GetOperator() && + lhs.GetLabelValues() == rhs.GetLabelValues(); +} + +inline bool operator==(const LabelSelector &lhs, const LabelSelector &rhs) { + return lhs.GetConstraints() == rhs.GetConstraints(); +} + +template +H AbslHashValue(H h, const LabelSelector &label_selector) { + h = H::combine(std::move(h), label_selector.GetConstraints().size()); + for (const auto &constraint : label_selector.GetConstraints()) { + h = H::combine(std::move(h), + constraint.GetLabelKey(), + static_cast(constraint.GetOperator())); + for (const auto &value : constraint.GetLabelValues()) { + h = H::combine(std::move(h), value); + } + } + return h; +} + } // namespace ray diff --git a/src/ray/common/task/task_spec.cc b/src/ray/common/task/task_spec.cc index de1fd01cec15..2b8aa51cfd87 100644 --- a/src/ray/common/task/task_spec.cc +++ b/src/ray/common/task/task_spec.cc @@ -109,6 +109,10 @@ void TaskSpecification::ComputeResources() { new ResourceSet(MapFromProtobuf(required_placement_resources))); } + // Set LabelSelector required for scheduling if specified. Parses string map + // from proto to LabelSelector data type. + label_selector_ = std::make_shared(message_->label_selector()); + if (!IsActorTask()) { // There is no need to compute `SchedulingClass` for actor tasks since // the actor tasks need not be scheduled. @@ -121,17 +125,17 @@ void TaskSpecification::ComputeResources() { : GetRequiredResources(); const auto &function_descriptor = FunctionDescriptor(); auto depth = GetDepth(); - auto sched_cls_desc = SchedulingClassDescriptor( - resource_set, function_descriptor, depth, GetSchedulingStrategy()); + auto label_selector = GetLabelSelector(); + auto sched_cls_desc = SchedulingClassDescriptor(resource_set, + label_selector, + function_descriptor, + depth, + GetSchedulingStrategy()); // Map the scheduling class descriptor to an integer for performance. sched_cls_id_ = GetSchedulingClass(sched_cls_desc); } runtime_env_hash_ = CalculateRuntimeEnvHash(SerializedRuntimeEnv()); - - // Set LabelSelector required for scheduling if specified. Parses string map - // from proto to LabelSelector data type. - label_selector_ = std::make_shared(message_->label_selector()); } // Task specification getter methods. diff --git a/src/ray/common/task/task_spec.h b/src/ray/common/task/task_spec.h index 6b0daae96154..d582241f00c1 100644 --- a/src/ray/common/task/task_spec.h +++ b/src/ray/common/task/task_spec.h @@ -22,6 +22,7 @@ #include #include +#include "absl/hash/hash.h" #include "absl/synchronization/mutex.h" #include "ray/common/function_descriptor.h" #include "ray/common/grpc_util.h" @@ -76,20 +77,24 @@ typedef int SchedulingClass; struct SchedulingClassDescriptor { public: explicit SchedulingClassDescriptor(ResourceSet rs, + LabelSelector ls, FunctionDescriptor fd, int64_t d, rpc::SchedulingStrategy scheduling_strategy) : resource_set(std::move(rs)), + label_selector(std::move(ls)), function_descriptor(std::move(fd)), depth(d), scheduling_strategy(std::move(scheduling_strategy)) {} ResourceSet resource_set; + LabelSelector label_selector; FunctionDescriptor function_descriptor; int64_t depth; rpc::SchedulingStrategy scheduling_strategy; bool operator==(const SchedulingClassDescriptor &other) const { return depth == other.depth && resource_set == other.resource_set && + label_selector == other.label_selector && function_descriptor == other.function_descriptor && scheduling_strategy == other.scheduling_strategy; } @@ -105,7 +110,21 @@ struct SchedulingClassDescriptor { for (const auto &pair : resource_set.GetResourceMap()) { buffer << pair.first << " : " << pair.second << ", "; } + buffer << "}"; + + buffer << "label_selector={"; + for (const auto &constraint : label_selector.GetConstraints()) { + buffer << constraint.GetLabelKey() << " " + << (constraint.GetOperator() == ray::LabelSelectorOperator::LABEL_IN ? "in" + : "!in") + << " ("; + for (const auto &val : constraint.GetLabelValues()) { + buffer << val << ", "; + } + buffer << "), "; + } buffer << "}}"; + return buffer.str(); } @@ -119,6 +138,16 @@ struct SchedulingClassDescriptor { return buffer.str(); } }; + +template +H AbslHashValue(H h, const SchedulingClassDescriptor &sched_cls) { + return H::combine(std::move(h), + sched_cls.resource_set, + sched_cls.function_descriptor->Hash(), + sched_cls.depth, + sched_cls.scheduling_strategy, + sched_cls.label_selector); +} } // namespace ray namespace std { @@ -201,17 +230,6 @@ struct hash { return hash_val; } }; - -template <> -struct hash { - size_t operator()(const ray::SchedulingClassDescriptor &sched_cls) const { - size_t hash_val = std::hash()(sched_cls.resource_set); - hash_val ^= sched_cls.function_descriptor->Hash(); - hash_val ^= sched_cls.depth; - hash_val ^= std::hash()(sched_cls.scheduling_strategy); - return hash_val; - } -}; } // namespace std namespace ray { diff --git a/src/ray/common/test/task_spec_test.cc b/src/ray/common/test/task_spec_test.cc index 17cf508f48cf..2876faa20011 100644 --- a/src/ray/common/test/task_spec_test.cc +++ b/src/ray/common/test/task_spec_test.cc @@ -23,89 +23,99 @@ TEST(TaskSpecTest, TestSchedulingClassDescriptor) { ResourceSet resources(absl::flat_hash_map({{"a", 1.0}})); rpc::SchedulingStrategy scheduling_strategy; scheduling_strategy.mutable_spread_scheduling_strategy(); - SchedulingClassDescriptor descriptor1(resources, descriptor, 0, scheduling_strategy); - SchedulingClassDescriptor descriptor2(resources, descriptor, 1, scheduling_strategy); + SchedulingClassDescriptor descriptor1( + resources, LabelSelector(), descriptor, 0, scheduling_strategy); + SchedulingClassDescriptor descriptor2( + resources, LabelSelector(), descriptor, 1, scheduling_strategy); scheduling_strategy.mutable_default_scheduling_strategy(); - SchedulingClassDescriptor descriptor3(resources, descriptor, 0, scheduling_strategy); + SchedulingClassDescriptor descriptor3( + resources, LabelSelector(), descriptor, 0, scheduling_strategy); scheduling_strategy.mutable_node_affinity_scheduling_strategy()->set_node_id("x"); scheduling_strategy.mutable_node_affinity_scheduling_strategy()->set_soft(true); - SchedulingClassDescriptor descriptor4(resources, descriptor, 0, scheduling_strategy); + SchedulingClassDescriptor descriptor4( + resources, LabelSelector(), descriptor, 0, scheduling_strategy); scheduling_strategy.mutable_node_affinity_scheduling_strategy()->set_node_id("y"); - SchedulingClassDescriptor descriptor5(resources, descriptor, 0, scheduling_strategy); - SchedulingClassDescriptor descriptor6(resources, descriptor, 0, scheduling_strategy); + SchedulingClassDescriptor descriptor5( + resources, LabelSelector(), descriptor, 0, scheduling_strategy); + SchedulingClassDescriptor descriptor6( + resources, LabelSelector(), descriptor, 0, scheduling_strategy); scheduling_strategy.mutable_node_affinity_scheduling_strategy() ->set_spill_on_unavailable(true); - SchedulingClassDescriptor descriptor10(resources, descriptor, 0, scheduling_strategy); + SchedulingClassDescriptor descriptor10( + resources, LabelSelector(), descriptor, 0, scheduling_strategy); scheduling_strategy.mutable_placement_group_scheduling_strategy() ->set_placement_group_id("o"); scheduling_strategy.mutable_placement_group_scheduling_strategy() ->set_placement_group_bundle_index(0); scheduling_strategy.mutable_placement_group_scheduling_strategy() ->set_placement_group_capture_child_tasks(true); - SchedulingClassDescriptor descriptor7(resources, descriptor, 0, scheduling_strategy); + SchedulingClassDescriptor descriptor7( + resources, LabelSelector(), descriptor, 0, scheduling_strategy); scheduling_strategy.mutable_placement_group_scheduling_strategy() ->set_placement_group_bundle_index(1); - SchedulingClassDescriptor descriptor8(resources, descriptor, 0, scheduling_strategy); + SchedulingClassDescriptor descriptor8( + resources, LabelSelector(), descriptor, 0, scheduling_strategy); scheduling_strategy.mutable_placement_group_scheduling_strategy() ->set_placement_group_bundle_index(0); - SchedulingClassDescriptor descriptor9(resources, descriptor, 0, scheduling_strategy); + SchedulingClassDescriptor descriptor9( + resources, LabelSelector(), descriptor, 0, scheduling_strategy); ASSERT_TRUE(descriptor1 == descriptor1); - ASSERT_TRUE(std::hash()(descriptor1) == - std::hash()(descriptor1)); + ASSERT_TRUE(absl::Hash()(descriptor1) == + absl::Hash()(descriptor1)); ASSERT_TRUE(TaskSpecification::GetSchedulingClass(descriptor1) == TaskSpecification::GetSchedulingClass(descriptor1)); ASSERT_FALSE(descriptor1 == descriptor2); - ASSERT_FALSE(std::hash()(descriptor1) == - std::hash()(descriptor2)); + ASSERT_FALSE(absl::Hash()(descriptor1) == + absl::Hash()(descriptor2)); ASSERT_FALSE(TaskSpecification::GetSchedulingClass(descriptor1) == TaskSpecification::GetSchedulingClass(descriptor2)); ASSERT_FALSE(descriptor1 == descriptor3); - ASSERT_FALSE(std::hash()(descriptor1) == - std::hash()(descriptor3)); + ASSERT_FALSE(absl::Hash()(descriptor1) == + absl::Hash()(descriptor3)); ASSERT_FALSE(TaskSpecification::GetSchedulingClass(descriptor1) == TaskSpecification::GetSchedulingClass(descriptor3)); ASSERT_FALSE(descriptor1 == descriptor4); - ASSERT_FALSE(std::hash()(descriptor1) == - std::hash()(descriptor4)); + ASSERT_FALSE(absl::Hash()(descriptor1) == + absl::Hash()(descriptor4)); ASSERT_FALSE(TaskSpecification::GetSchedulingClass(descriptor1) == TaskSpecification::GetSchedulingClass(descriptor4)); ASSERT_FALSE(descriptor4 == descriptor5); - ASSERT_FALSE(std::hash()(descriptor4) == - std::hash()(descriptor5)); + ASSERT_FALSE(absl::Hash()(descriptor4) == + absl::Hash()(descriptor5)); ASSERT_FALSE(TaskSpecification::GetSchedulingClass(descriptor4) == TaskSpecification::GetSchedulingClass(descriptor5)); ASSERT_TRUE(descriptor5 == descriptor6); - ASSERT_TRUE(std::hash()(descriptor5) == - std::hash()(descriptor6)); + ASSERT_TRUE(absl::Hash()(descriptor5) == + absl::Hash()(descriptor6)); ASSERT_TRUE(TaskSpecification::GetSchedulingClass(descriptor5) == TaskSpecification::GetSchedulingClass(descriptor6)); ASSERT_FALSE(descriptor6 == descriptor10); - ASSERT_FALSE(std::hash()(descriptor6) == - std::hash()(descriptor10)); + ASSERT_FALSE(absl::Hash()(descriptor6) == + absl::Hash()(descriptor10)); ASSERT_FALSE(TaskSpecification::GetSchedulingClass(descriptor6) == TaskSpecification::GetSchedulingClass(descriptor10)); ASSERT_FALSE(descriptor6 == descriptor7); - ASSERT_FALSE(std::hash()(descriptor6) == - std::hash()(descriptor7)); + ASSERT_FALSE(absl::Hash()(descriptor6) == + absl::Hash()(descriptor7)); ASSERT_FALSE(TaskSpecification::GetSchedulingClass(descriptor6) == TaskSpecification::GetSchedulingClass(descriptor7)); ASSERT_FALSE(descriptor7 == descriptor8); - ASSERT_FALSE(std::hash()(descriptor7) == - std::hash()(descriptor8)); + ASSERT_FALSE(absl::Hash()(descriptor7) == + absl::Hash()(descriptor8)); ASSERT_FALSE(TaskSpecification::GetSchedulingClass(descriptor7) == TaskSpecification::GetSchedulingClass(descriptor8)); ASSERT_TRUE(descriptor7 == descriptor9); - ASSERT_TRUE(std::hash()(descriptor7) == - std::hash()(descriptor9)); + ASSERT_TRUE(absl::Hash()(descriptor7) == + absl::Hash()(descriptor9)); ASSERT_TRUE(TaskSpecification::GetSchedulingClass(descriptor7) == TaskSpecification::GetSchedulingClass(descriptor9)); } @@ -270,10 +280,10 @@ TEST(TaskSpecTest, TestNodeLabelSchedulingStrategy) { expr_2->set_key("key"); expr_2->mutable_operator_()->mutable_label_in()->add_values("value1"); - ASSERT_TRUE(std::hash()(scheduling_strategy_1) == - std::hash()(scheduling_strategy_1)); - ASSERT_TRUE(std::hash()(scheduling_strategy_1) == - std::hash()(scheduling_strategy_2)); + ASSERT_TRUE(absl::Hash()(scheduling_strategy_1) == + absl::Hash()(scheduling_strategy_1)); + ASSERT_TRUE(absl::Hash()(scheduling_strategy_1) == + absl::Hash()(scheduling_strategy_2)); rpc::SchedulingStrategy scheduling_strategy_3; auto expr_3 = scheduling_strategy_3.mutable_node_label_scheduling_strategy() @@ -281,8 +291,8 @@ TEST(TaskSpecTest, TestNodeLabelSchedulingStrategy) { ->add_expressions(); expr_3->set_key("key"); expr_3->mutable_operator_()->mutable_label_in()->add_values("value1"); - ASSERT_FALSE(std::hash()(scheduling_strategy_1) == - std::hash()(scheduling_strategy_3)); + ASSERT_FALSE(absl::Hash()(scheduling_strategy_1) == + absl::Hash()(scheduling_strategy_3)); rpc::SchedulingStrategy scheduling_strategy_4; auto expr_4 = scheduling_strategy_4.mutable_node_label_scheduling_strategy() @@ -292,8 +302,8 @@ TEST(TaskSpecTest, TestNodeLabelSchedulingStrategy) { expr_4->mutable_operator_()->mutable_label_in()->add_values("value1"); expr_4->mutable_operator_()->mutable_label_in()->add_values("value2"); - ASSERT_FALSE(std::hash()(scheduling_strategy_1) == - std::hash()(scheduling_strategy_4)); + ASSERT_FALSE(absl::Hash()(scheduling_strategy_1) == + absl::Hash()(scheduling_strategy_4)); rpc::SchedulingStrategy scheduling_strategy_5; auto expr_5 = scheduling_strategy_5.mutable_node_label_scheduling_strategy() @@ -302,7 +312,7 @@ TEST(TaskSpecTest, TestNodeLabelSchedulingStrategy) { expr_5->set_key("key"); expr_5->mutable_operator_()->mutable_label_not_in()->add_values("value1"); - ASSERT_FALSE(std::hash()(scheduling_strategy_1) == - std::hash()(scheduling_strategy_5)); + ASSERT_FALSE(absl::Hash()(scheduling_strategy_1) == + absl::Hash()(scheduling_strategy_5)); } } // namespace ray diff --git a/src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.cc b/src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.cc index d15c1dfaeec8..26c7eff84d1b 100644 --- a/src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.cc @@ -22,6 +22,7 @@ #include "ray/gcs/gcs_server/gcs_actor_manager.h" #include "ray/gcs/gcs_server/gcs_node_manager.h" #include "ray/gcs/gcs_server/gcs_placement_group_mgr.h" +#include "ray/gcs/gcs_server/state_util.h" #include "ray/gcs/pb_util.h" namespace ray { @@ -229,14 +230,31 @@ void GcsAutoscalerStateManager::GetPendingGangResourceRequests( // to node crashed. continue; } - // Add the resources. - auto resource_req = gang_resource_req->add_requests(); - *resource_req->mutable_resources_bundle() = - std::move(*bundle.mutable_unit_resources()); + + const auto &unit_resources = bundle.unit_resources(); + + // Add the resources. This field will be removed after migrating to + // use the BundleSelector for GangResourceRequests. + auto legacy_resource_req = gang_resource_req->add_requests(); + *legacy_resource_req->mutable_resources_bundle() = unit_resources; + + // Create a new BundleSelector + auto *bundle_selector = gang_resource_req->add_bundle_selectors(); + + // Add ResourceRequest for this bundle. + auto *bundle_resource_req = bundle_selector->add_resource_requests(); + *bundle_resource_req->mutable_resources_bundle() = unit_resources; + + // Parse label selector map into LabelSelector proto in ResourceRequest + if (!bundle.label_selector().empty()) { + ray::LabelSelector selector(bundle.label_selector()); + *bundle_resource_req->add_label_selectors() = selector.ToProto(); + } // Add the placement constraint. if (pg_constraint.has_value()) { - resource_req->add_placement_constraints()->CopyFrom(pg_constraint.value()); + legacy_resource_req->add_placement_constraints()->CopyFrom(pg_constraint.value()); + bundle_resource_req->add_placement_constraints()->CopyFrom(pg_constraint.value()); } } } @@ -262,6 +280,8 @@ void GcsAutoscalerStateManager::OnNodeAdd(const rpc::GcsNodeInfo &node) { // autoscaler reports). Temporary underreporting when node is added is fine. (*node_info->second.second.mutable_resources_total()) = node.resources_total(); (*node_info->second.second.mutable_resources_available()) = node.resources_total(); + // Populate node labels. + (*node_info->second.second.mutable_labels()) = node.labels(); } void GcsAutoscalerStateManager::UpdateResourceLoadAndUsage(rpc::ResourcesData data) { @@ -280,11 +300,10 @@ void GcsAutoscalerStateManager::UpdateResourceLoadAndUsage(rpc::ResourcesData da iter->second.first = absl::Now(); } -absl::flat_hash_map, rpc::ResourceDemand> +absl::flat_hash_map GcsAutoscalerStateManager::GetAggregatedResourceLoad() const { RAY_CHECK(thread_checker_.IsOnSameThread()); - absl::flat_hash_map, rpc::ResourceDemand> - aggregate_load; + absl::flat_hash_map aggregate_load; for (const auto &info : node_resource_info_) { gcs::FillAggregateLoad(info.second.second, &aggregate_load); } @@ -304,7 +323,9 @@ void GcsAutoscalerStateManager::GetPendingResourceRequests( rpc::autoscaler::ClusterResourceState *state) { RAY_CHECK(thread_checker_.IsOnSameThread()); auto aggregate_load = GetAggregatedResourceLoad(); - for (const auto &[shape, demand] : aggregate_load) { + for (auto &[key, demand] : aggregate_load) { + const auto &shape = key.shape; + auto num_pending = demand.num_infeasible_requests_queued() + demand.backlog_size() + demand.num_ready_requests_queued(); if (num_pending > 0) { @@ -312,6 +333,11 @@ void GcsAutoscalerStateManager::GetPendingResourceRequests( pending_req->set_count(num_pending); auto req = pending_req->mutable_request(); req->mutable_resources_bundle()->insert(shape.begin(), shape.end()); + + // Add label selectors to ResourceRequest + for (auto &selector : key.label_selectors) { + *req->add_label_selectors() = std::move(selector); + } } } } @@ -390,6 +416,9 @@ void GcsAutoscalerStateManager::GetNodeStates( node_state_proto->mutable_dynamic_labels()->insert( {FormatPlacementGroupLabelName(pg_id.Hex()), ""}); } + // Add Ray node labels. + const auto &node_labels = gcs_node_info.labels(); + node_state_proto->mutable_labels()->insert(node_labels.begin(), node_labels.end()); }; const auto &alive_nodes = gcs_node_manager_.GetAllAliveNodes(); @@ -482,14 +511,35 @@ std::string GcsAutoscalerStateManager::DebugString() const { << last_cluster_resource_state_version_ << "\n- pending demands:\n"; auto aggregate_load = GetAggregatedResourceLoad(); - for (const auto &[shape, demand] : aggregate_load) { + for (const auto &[key, demand] : aggregate_load) { auto num_pending = demand.num_infeasible_requests_queued() + demand.backlog_size() + demand.num_ready_requests_queued(); stream << "\t{"; if (num_pending > 0) { - for (const auto &[resource, quantity] : shape) { - stream << resource << ": " << quantity << ", "; + for (const auto &entry : key.shape) { + stream << entry.first << ": " << entry.second << ", "; + } + if (!key.label_selectors.empty()) { + stream << "label_selectors: ["; + for (const auto &selector : key.label_selectors) { + stream << "{"; + for (const auto &constraint : selector.label_constraints()) { + stream << constraint.label_key() << " " + << (constraint.operator_() == + rpc::LabelSelectorOperator::LABEL_OPERATOR_IN + ? "in" + : "!in") + << " ["; + for (const auto &val : constraint.label_values()) { + stream << val << ","; + } + stream << "]" + << " "; + } + stream << "}, "; + } + stream << "]"; } } stream << "} * " << num_pending << "\n"; diff --git a/src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.h b/src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.h index dbd3f7b2f6f2..6994f075c666 100644 --- a/src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.h +++ b/src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.h @@ -21,6 +21,7 @@ #include "ray/gcs/gcs_server/gcs_init_data.h" #include "ray/gcs/gcs_server/gcs_kv_manager.h" +#include "ray/gcs/gcs_server/state_util.h" #include "ray/gcs/pubsub/gcs_pub_sub.h" #include "ray/rpc/gcs_server/gcs_rpc_server.h" #include "ray/rpc/node_manager/node_manager_client_pool.h" @@ -92,8 +93,8 @@ class GcsAutoscalerStateManager : public rpc::autoscaler::AutoscalerStateHandler private: /// \brief Get the aggregated resource load from all nodes. - absl::flat_hash_map, rpc::ResourceDemand> - GetAggregatedResourceLoad() const; + absl::flat_hash_map GetAggregatedResourceLoad() + const; /// \brief Internal method for populating the rpc::ClusterResourceState /// protobuf. diff --git a/src/ray/gcs/gcs_server/gcs_resource_manager.cc b/src/ray/gcs/gcs_server/gcs_resource_manager.cc index 75dbcc33349e..088c32cc2b15 100644 --- a/src/ray/gcs/gcs_server/gcs_resource_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_resource_manager.cc @@ -191,8 +191,7 @@ void GcsResourceManager::HandleGetAllResourceUsage( rpc::SendReplyCallback send_reply_callback) { if (!node_resource_usages_.empty()) { rpc::ResourceUsageBatchData batch; - absl::flat_hash_map, rpc::ResourceDemand> - aggregate_load; + absl::flat_hash_map aggregate_load; for (const auto &usage : node_resource_usages_) { // Aggregate the load reported by each raylet. @@ -217,9 +216,12 @@ void GcsResourceManager::HandleGetAllResourceUsage( for (const auto &demand : aggregate_load) { auto demand_proto = batch.mutable_resource_load_by_shape()->add_resource_demands(); demand_proto->CopyFrom(demand.second); - for (const auto &resource_pair : demand.first) { + for (const auto &resource_pair : demand.first.shape) { (*demand_proto->mutable_shape())[resource_pair.first] = resource_pair.second; } + for (auto &selector : demand.first.label_selectors) { + *demand_proto->add_label_selectors() = std::move(selector); + } } // Update placement group load to heartbeat batch. // This is updated only one per second. diff --git a/src/ray/gcs/gcs_server/state_util.cc b/src/ray/gcs/gcs_server/state_util.cc index e7505836b367..b1f41b393682 100644 --- a/src/ray/gcs/gcs_server/state_util.cc +++ b/src/ray/gcs/gcs_server/state_util.cc @@ -19,12 +19,19 @@ namespace ray { namespace gcs { -void FillAggregateLoad(const rpc::ResourcesData &resources_data, - absl::flat_hash_map, - rpc::ResourceDemand> *aggregate_load) { +void FillAggregateLoad( + const rpc::ResourcesData &resources_data, + absl::flat_hash_map *aggregate_load) { const auto &load = resources_data.resource_load_by_shape(); for (const auto &demand : load.resource_demands()) { - auto &aggregate_demand = (*aggregate_load)[demand.shape()]; + ResourceDemandKey key; + key.shape = demand.shape(); + + key.label_selectors.reserve(demand.label_selectors().size()); + for (const auto &selector : demand.label_selectors()) { + key.label_selectors.push_back(selector); + } + auto &aggregate_demand = (*aggregate_load)[key]; aggregate_demand.set_num_ready_requests_queued( aggregate_demand.num_ready_requests_queued() + demand.num_ready_requests_queued()); diff --git a/src/ray/gcs/gcs_server/state_util.h b/src/ray/gcs/gcs_server/state_util.h index 186ce4a8c388..38dc625969ea 100644 --- a/src/ray/gcs/gcs_server/state_util.h +++ b/src/ray/gcs/gcs_server/state_util.h @@ -17,8 +17,63 @@ #include #include "absl/container/flat_hash_map.h" +#include "absl/hash/hash.h" #include "src/ray/protobuf/gcs.pb.h" +namespace ray { +namespace gcs { + +struct ResourceDemandKey { + google::protobuf::Map shape; + std::vector label_selectors; +}; + +inline bool operator==(const ResourceDemandKey &lhs, const ResourceDemandKey &rhs) { + if (lhs.shape.size() != rhs.shape.size()) { + return false; + } + for (const auto &entry : lhs.shape) { + auto it = rhs.shape.find(entry.first); + if (it == rhs.shape.end() || it->second != entry.second) { + return false; + } + } + + if (lhs.label_selectors.size() != rhs.label_selectors.size()) { + return false; + } + for (size_t i = 0; i < lhs.label_selectors.size(); ++i) { + if (lhs.label_selectors[i].SerializeAsString() != + rhs.label_selectors[i].SerializeAsString()) { + return false; + } + } + return true; +} + +template +H AbslHashValue(H h, const ResourceDemandKey &key); + +/// Aggregate nodes' pending task info. +/// +/// \param resources_data A node's pending task info (by shape). +/// \param aggregate_load[out] The aggregate pending task info (across the cluster). +void FillAggregateLoad( + const rpc::ResourcesData &resources_data, + absl::flat_hash_map *aggregate_load); + +} // namespace gcs +} // namespace ray + +template +H ray::gcs::AbslHashValue(H h, const ray::gcs::ResourceDemandKey &key) { + h = H::combine(std::move(h), key.shape); + for (const auto &selector : key.label_selectors) { + h = H::combine(std::move(h), selector.SerializeAsString()); + } + return h; +} + namespace std { template <> struct hash> { @@ -48,17 +103,11 @@ struct equal_to> { return true; } }; -} // namespace std - -namespace ray { -namespace gcs { -/// Aggregate nodes' pending task info. -/// -/// \param resources_data A node's pending task info (by shape). -/// \param aggregate_load[out] The aggregate pending task info (across the cluster). -void FillAggregateLoad(const rpc::ResourcesData &resources_data, - absl::flat_hash_map, - rpc::ResourceDemand> *aggregate_load); -} // namespace gcs -} // namespace ray +template <> +struct hash { + size_t operator()(const ray::gcs::ResourceDemandKey &k) const { + return absl::Hash{}(k); + } +}; +} // namespace std diff --git a/src/ray/gcs/gcs_server/test/gcs_autoscaler_state_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_autoscaler_state_manager_test.cc index bf5d3ee1f4ce..a085164aa19b 100644 --- a/src/ray/gcs/gcs_server/test/gcs_autoscaler_state_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_autoscaler_state_manager_test.cc @@ -141,6 +141,15 @@ class GcsAutoscalerStateManagerTest : public ::testing::Test { void CheckNodeLabels(const rpc::autoscaler::NodeState &node_state, const std::unordered_map &labels) { + ASSERT_EQ(node_state.labels_size(), labels.size()); + for (const auto &label : labels) { + ASSERT_EQ(node_state.labels().at(label.first), label.second); + } + } + + void CheckNodeDynamicLabels( + const rpc::autoscaler::NodeState &node_state, + const std::unordered_map &labels) { ASSERT_EQ(node_state.dynamic_labels_size(), labels.size()); for (const auto &label : labels) { ASSERT_EQ(node_state.dynamic_labels().at(label.first), label.second); @@ -469,9 +478,9 @@ TEST_F(GcsAutoscalerStateManagerTest, TestNodeDynamicLabelsWithPG) { const auto &state = GetClusterResourceStateSync(); ASSERT_EQ(state.node_states_size(), 1); - CheckNodeLabels(state.node_states(0), - {{FormatPlacementGroupLabelName(pg1.Hex()), ""}, - {FormatPlacementGroupLabelName(pg2.Hex()), ""}}); + CheckNodeDynamicLabels(state.node_states(0), + {{FormatPlacementGroupLabelName(pg1.Hex()), ""}, + {FormatPlacementGroupLabelName(pg2.Hex()), ""}}); } } @@ -495,11 +504,13 @@ TEST_F(GcsAutoscalerStateManagerTest, TestBasicResourceRequests) { {Mocker::GenResourceDemand({{"CPU", 1}}, /* nun_ready_queued */ 1, /* nun_infeasible */ 1, - /* num_backlog */ 0), + /* num_backlog */ 0, + /* label_selectors */ {}), Mocker::GenResourceDemand({{"CPU", 4}, {"GPU", 2}}, /* num_ready_queued */ 0, /* num_infeasible */ 1, - /* num_backlog */ 1)}); + /* num_backlog */ 1, + /* label_selectors */ {})}); const auto &state = GetClusterResourceStateSync(); // Expect each pending resources shape to be num_infeasible + num_backlog. @@ -887,20 +898,24 @@ TEST_F(GcsAutoscalerStateManagerTest, {Mocker::GenResourceDemand({{"GPU", 1}}, /* nun_ready_queued */ 1, /* nun_infeasible */ 1, - /* num_backlog */ 0), + /* num_backlog */ 0, + /* label_selectors */ {}), Mocker::GenResourceDemand({{"CPU", 1}}, /* nun_ready_queued */ 1, /* nun_infeasible */ 0, - /* num_backlog */ 1), + /* num_backlog */ 1, + /* label_selectors */ {}), Mocker::GenResourceDemand({{"CPU", 3}}, /* num_ready_queued */ 0, /* num_infeasible */ 1, - /* num_backlog */ 1)}); + /* num_backlog */ 1, + /* label_selectors */ {})}); UpdateResourceLoads(node_2->node_id(), {Mocker::GenResourceDemand({{"CPU", 2}}, /* nun_ready_queued */ 1, /* nun_infeasible */ 0, - /* num_backlog */ 1)}); + /* num_backlog */ 1, + /* label_selectors */ {})}); } // Update autoscaling state @@ -947,19 +962,23 @@ TEST_F(GcsAutoscalerStateManagerTest, /* nun_ready_queued */ 1, /* nun_infeasible */ 1, /* num_backlog */ 0), + /* label_selectors */ {}, Mocker::GenResourceDemand({{"CPU", 1}}, /* nun_ready_queued */ 1, /* nun_infeasible */ 0, /* num_backlog */ 1), + /* label_selectors */ {}, Mocker::GenResourceDemand({{"CPU", 3}}, /* num_ready_queued */ 0, /* num_infeasible */ 1, - /* num_backlog */ 1)}); + /* num_backlog */ 1, + /* label_selectors */ {})}); UpdateResourceLoads(node_2->node_id(), {Mocker::GenResourceDemand({{"CPU", 2}}, /* nun_ready_queued */ 1, /* nun_infeasible */ 0, - /* num_backlog */ 1)}); + /* num_backlog */ 1, + /* label_selectors */ {})}); } // Update autoscaling state @@ -1001,5 +1020,136 @@ TEST_F(GcsAutoscalerStateManagerTest, } } +TEST_F(GcsAutoscalerStateManagerTest, TestNodeLabelsAdded) { + auto node = Mocker::GenNodeInfo(); + node->mutable_resources_total()->insert({"CPU", 2}); + node->set_instance_id("instance_1"); + (*node->mutable_labels())["accelerator-type"] = "TPU"; + (*node->mutable_labels())["region"] = "us-central1"; + AddNode(node); + + const auto &state = GetClusterResourceStateSync(); + ASSERT_EQ(state.node_states_size(), 1); + + CheckNodeLabels(state.node_states(0), + {{"accelerator-type", "TPU"}, {"region", "us-central1"}}); +} + +TEST_F(GcsAutoscalerStateManagerTest, TestGetPendingResourceRequestsWithLabelSelectors) { + auto node = Mocker::GenNodeInfo(); + node->mutable_resources_total()->insert({"CPU", 2}); + node->set_instance_id("instance_1"); + AddNode(node); + + // Add label selector to ResourceDemand + { + rpc::LabelSelector selector; + + auto add_constraint = [&](const std::string &key, + rpc::LabelSelectorOperator op, + const std::string &value) { + auto *constraint = selector.add_label_constraints(); + constraint->set_label_key(key); + constraint->set_operator_(op); + constraint->add_label_values(value); + }; + + add_constraint("accelerator-type", rpc::LABEL_OPERATOR_IN, "TPU"); + add_constraint("node-group", rpc::LABEL_OPERATOR_NOT_IN, "gpu-group"); + add_constraint("market-type", rpc::LABEL_OPERATOR_IN, "spot"); + add_constraint("region", rpc::LABEL_OPERATOR_NOT_IN, "us-west4"); + + // Simulate an infeasible request with a label selector + UpdateResourceLoads(node->node_id(), + {Mocker::GenResourceDemand({{"CPU", 2}}, + /*ready=*/0, + /*infeasible=*/1, + /*backlog=*/0, + {selector})}); + } + + // Validate the cluster state includes the generated pending request + { + const auto &state = GetClusterResourceStateSync(); + ASSERT_EQ(state.pending_resource_requests_size(), 1); + + const auto &req = state.pending_resource_requests(0); + ASSERT_EQ(req.count(), 1); + CheckResourceRequest(req.request(), {{"CPU", 2}}); + + std::unordered_map> + expected_vals = { + {"accelerator-type", {rpc::LABEL_OPERATOR_IN, "TPU"}}, + {"node-group", {rpc::LABEL_OPERATOR_NOT_IN, "gpu-group"}}, + {"market-type", {rpc::LABEL_OPERATOR_IN, "spot"}}, + {"region", {rpc::LABEL_OPERATOR_NOT_IN, "us-west4"}}, + }; + + ASSERT_EQ(req.request().label_selectors_size(), 1); + const auto &parsed_selector = req.request().label_selectors(0); + ASSERT_EQ(parsed_selector.label_constraints_size(), expected_vals.size()); + + for (const auto &constraint : parsed_selector.label_constraints()) { + const auto it = expected_vals.find(constraint.label_key()); + ASSERT_NE(it, expected_vals.end()) + << "Unexpected label key: " << constraint.label_key(); + ASSERT_EQ(constraint.operator_(), it->second.first); + ASSERT_EQ(constraint.label_values_size(), 1); + ASSERT_EQ(constraint.label_values(0), it->second.second); + } + } +} + +TEST_F(GcsAutoscalerStateManagerTest, + TestGetPendingGangResourceRequestsWithBundleSelectors) { + rpc::PlacementGroupLoad load; + + // Create PG with two bundles with different label selectors + auto *pg_data = load.add_placement_group_data(); + pg_data->set_state(rpc::PlacementGroupTableData::PENDING); + auto pg_id = PlacementGroupID::Of(JobID::FromInt(1)); + pg_data->set_placement_group_id(pg_id.Binary()); + + auto *bundle1 = pg_data->add_bundles(); + (*bundle1->mutable_unit_resources())["CPU"] = 2; + (*bundle1->mutable_unit_resources())["GPU"] = 1; + (*bundle1->mutable_label_selector())["accelerator"] = "in(A100,B200)"; + + auto *bundle2 = pg_data->add_bundles(); + (*bundle2->mutable_unit_resources())["CPU"] = 4; + (*bundle2->mutable_label_selector())["accelerator"] = "!in(TPU)"; + + EXPECT_CALL(*gcs_placement_group_manager_, GetPlacementGroupLoad) + .WillOnce(Return(std::make_shared(std::move(load)))); + + const auto &state = GetClusterResourceStateSync(); + const auto &requests = state.pending_gang_resource_requests(); + ASSERT_EQ(requests.size(), 1); + + const auto &req = requests.Get(0); + ASSERT_EQ(req.bundle_selectors_size(), 2); + + const auto &r1 = req.bundle_selectors(0).resource_requests(0); + const auto &r2 = req.bundle_selectors(1).resource_requests(0); + + ASSERT_EQ(r1.label_selectors_size(), 1); + ASSERT_EQ(r2.label_selectors_size(), 1); + + const auto &c1 = r1.label_selectors(0).label_constraints(0); + const auto &c2 = r2.label_selectors(0).label_constraints(0); + + EXPECT_EQ(c1.label_key(), "accelerator"); + EXPECT_EQ(c1.operator_(), rpc::LabelSelectorOperator::LABEL_OPERATOR_IN); + ASSERT_EQ(c1.label_values_size(), 2); + EXPECT_THAT(absl::flat_hash_set(c1.label_values().begin(), + c1.label_values().end()), + ::testing::UnorderedElementsAre("A100", "B200")); + + EXPECT_EQ(c2.label_key(), "accelerator"); + EXPECT_EQ(c2.operator_(), rpc::LabelSelectorOperator::LABEL_OPERATOR_NOT_IN); + ASSERT_EQ(c2.label_values_size(), 1); + EXPECT_EQ(c2.label_values(0), "TPU"); +} + } // namespace gcs } // namespace ray diff --git a/src/ray/gcs/test/gcs_test_util.h b/src/ray/gcs/test/gcs_test_util.h index fd46de5f3df0..56379b5a1154 100644 --- a/src/ray/gcs/test/gcs_test_util.h +++ b/src/ray/gcs/test/gcs_test_util.h @@ -327,7 +327,8 @@ struct Mocker { const absl::flat_hash_map &resource_demands, int64_t num_ready_queued, int64_t num_infeasible, - int64_t num_backlog) { + int64_t num_backlog, + const std::vector &label_selectors = {}) { rpc::ResourceDemand resource_demand; for (const auto &resource : resource_demands) { (*resource_demand.mutable_shape())[resource.first] = resource.second; @@ -335,6 +336,9 @@ struct Mocker { resource_demand.set_num_ready_requests_queued(num_ready_queued); resource_demand.set_num_infeasible_requests_queued(num_infeasible); resource_demand.set_backlog_size(num_backlog); + for (const auto &selector : label_selectors) { + *resource_demand.add_label_selectors() = selector; + } return resource_demand; } diff --git a/src/ray/protobuf/BUILD b/src/ray/protobuf/BUILD index d45cb48fd04d..9a91cdee0c05 100644 --- a/src/ray/protobuf/BUILD +++ b/src/ray/protobuf/BUILD @@ -351,6 +351,10 @@ cc_proto_library( proto_library( name = "autoscaler_proto", srcs = ["autoscaler.proto"], + deps = [ + ":common_proto", + ":runtime_env_common_proto", + ], ) cc_proto_library( diff --git a/src/ray/protobuf/autoscaler.proto b/src/ray/protobuf/autoscaler.proto index c9faadbb3490..9ad2ef7b191c 100644 --- a/src/ray/protobuf/autoscaler.proto +++ b/src/ray/protobuf/autoscaler.proto @@ -16,6 +16,8 @@ syntax = "proto3"; package ray.rpc.autoscaler; +import "src/ray/protobuf/common.proto"; + // ============= Cluster Resources ==================== // // Following fields represents the Cluster Resources autoscaler interested @@ -51,32 +53,6 @@ message PlacementConstraint { optional AffinityConstraint affinity = 2; } -// The type of operator to use for the label constraint. -enum LabelOperator { - LABEL_OPERATOR_UNSPECIFIED = 0; - // This is to support equality or in semantics. - LABEL_OPERATOR_IN = 1; - // This is to support not equal or not in semantics. - LABEL_OPERATOR_NOT_IN = 2; -} - -// A node label constraint with a key, one or a list of values and an operator. -message LabelConstraint { - // The key of the label - string label_key = 1; - // The operator to use for the label constraint. - LabelOperator operator = 2; - // The values to check against. - repeated string label_values = 3; -} - -// A list of node label constraints to specify the label requirements in a -// resource request. -message LabelSelector { - // The list of node label constraints with AND semantics. - repeated LabelConstraint label_constraints = 1; -} - message ResourceRequest { // resource requirements for the request. map resources_bundle = 1; @@ -195,6 +171,10 @@ message NodeState { // Observability debug string describing why the node is not idle. repeated string node_activity = 12; + + // Labels associated with this node. `ray.io/` labels set by + // default by Ray or specified by the user at node init. + map labels = 13; } // ============= Autoscaling State Service API ======================= diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index 5774cd36f78f..bd59d4d3732b 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -615,10 +615,6 @@ message TaskInfoEntry { map label_selector = 28; } -message LabelSelector { - map label_selector_dict = 1; -} - message TaskAttempt { // The task id of the task attempt. bytes task_id = 1; @@ -1029,6 +1025,32 @@ enum PlacementStrategy { // The group is not allowed to deploy more than one bundle on a node. STRICT_SPREAD = 3; } + +// The type of operator to use for the label constraint. +enum LabelSelectorOperator { + // This is to support equality or in semantics. + LABEL_OPERATOR_IN = 0; + // This is to support not equal or not in semantics. + LABEL_OPERATOR_NOT_IN = 1; +} + +// A node label constraint with a key, one or a list of values and an operator. +message LabelSelectorConstraint { + // The key of the label + string label_key = 1; + // The operator to use for the label constraint. + LabelSelectorOperator operator = 2; + // The values to check against. + repeated string label_values = 3; +} + +// A list of node label constraints to specify the label requirements in a +// resource request. +message LabelSelector { + // The list of node label constraints with AND semantics. + repeated LabelSelectorConstraint label_constraints = 1; +} + /////////////////////////////////////////////////////////////////////////////// // Info about a named actor. diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index f3acd695f704..8dbbc8cbb3fd 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -493,6 +493,8 @@ message ResourceDemand { // The number of requests of this shape still queued in CoreWorkers that this // raylet knows about. int64 backlog_size = 4; + // The label selector constraints for this Resource shape on a node. + repeated LabelSelector label_selectors = 5; } // Represents the demand sorted by resource shape. @@ -539,6 +541,8 @@ message ResourcesData { // The value is the timestamp when // the node will be force killed. int64 draining_deadline_timestamp_ms = 18; + // Te key-value labels of this node. + map labels = 19; } message ResourceUsageBatchData { diff --git a/src/ray/raylet/scheduling/scheduler_resource_reporter.cc b/src/ray/raylet/scheduling/scheduler_resource_reporter.cc index 6704c7f2dc25..0523851724a0 100644 --- a/src/ray/raylet/scheduling/scheduler_resource_reporter.cc +++ b/src/ray/raylet/scheduling/scheduler_resource_reporter.cc @@ -96,6 +96,7 @@ void SchedulerResourceReporter::FillResourceUsage(rpc::ResourcesData &data) cons } const auto &resources = scheduling_class_descriptor.resource_set.GetResourceMap(); + const auto &label_selectors = scheduling_class_descriptor.label_selector; auto by_shape_entry = resource_load_by_shape->Add(); for (const auto &resource : resources) { @@ -110,6 +111,9 @@ void SchedulerResourceReporter::FillResourceUsage(rpc::ResourcesData &data) cons (*by_shape_entry->mutable_shape())[label] = quantity; } + // Add label selectors + *by_shape_entry->add_label_selectors() = label_selectors.ToProto(); + if (is_infeasible) { by_shape_entry->set_num_infeasible_requests_queued(count); } else {