diff --git a/python/ray/_private/label_utils.py b/python/ray/_private/label_utils.py index 1c7161d75dfa..6ca9c5ec1052 100644 --- a/python/ray/_private/label_utils.py +++ b/python/ray/_private/label_utils.py @@ -148,6 +148,22 @@ def validate_label_value(value: str): ) +def validate_label_selector(label_selector: Optional[Dict[str, str]]) -> Optional[str]: + if label_selector is None: + return None + + for key, value in label_selector.items(): + possible_error_message = validate_label_key(key) + if possible_error_message: + return possible_error_message + if value is not None: + possible_error_message = validate_label_selector_value(value) + if possible_error_message: + return possible_error_message + + return None + + def validate_label_selector_value(selector: str) -> Optional[str]: if selector == "": return None diff --git a/python/ray/_private/ray_option_utils.py b/python/ray/_private/ray_option_utils.py index ae6c2ba04426..46291d4bf637 100644 --- a/python/ray/_private/ray_option_utils.py +++ b/python/ray/_private/ray_option_utils.py @@ -6,8 +6,7 @@ import ray from ray._private import ray_constants from ray._private.label_utils import ( - validate_label_key, - validate_label_selector_value, + validate_label_selector, ) from ray._private.utils import get_ray_doc_version from ray.util.placement_group import PlacementGroup @@ -123,24 +122,8 @@ def _validate_resources(resources: Optional[Dict[str, float]]) -> Optional[str]: return None -def _validate_label_selector(label_selector: Optional[Dict[str, str]]) -> Optional[str]: - if label_selector is None: - return None - - for key, value in label_selector.items(): - possible_error_message = validate_label_key(key) - if possible_error_message: - return possible_error_message - if value is not None: - possible_error_message = validate_label_selector_value(value) - if possible_error_message: - return possible_error_message - - return None - - _common_options = { - "label_selector": Option((dict, type(None)), lambda x: _validate_label_selector(x)), + "label_selector": Option((dict, type(None)), lambda x: validate_label_selector(x)), "accelerator_type": Option((str, type(None))), "memory": _resource_option("memory"), "name": Option((str, type(None))), diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 5711f0273efa..ab91c9cf00dd 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -3852,7 +3852,8 @@ cdef class CoreWorker: c_string strategy, c_bool is_detached, double max_cpu_fraction_per_node, - soft_target_node_id): + soft_target_node_id, + c_vector[unordered_map[c_string, c_string]] bundle_label_selector): cdef: CPlacementGroupID c_placement_group_id CPlacementStrategy c_strategy @@ -3883,7 +3884,8 @@ cdef class CoreWorker: bundles, is_detached, max_cpu_fraction_per_node, - c_soft_target_node_id), + c_soft_target_node_id, + bundle_label_selector), &c_placement_group_id)) return PlacementGroupID(c_placement_group_id.Binary()) diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index f4581f319a31..8efe91235bae 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -365,6 +365,7 @@ cdef extern from "ray/core_worker/common.h" nogil: c_bool is_detached, double max_cpu_fraction_per_node, CNodeID soft_target_node_id, + const c_vector[unordered_map[c_string, c_string]] &bundle_label_selector, ) cdef cppclass CObjectLocation "ray::core::ObjectLocation": diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index f58888e4a1f3..fbc81c938c99 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -386,6 +386,7 @@ py_test_module_list( py_test_module_list( size = "medium", files = [ + "test_bundle_label_selector.py", "test_label_scheduling.py", "test_minimal_install.py", "test_path_utils.py", diff --git a/python/ray/tests/test_bundle_label_selector.py b/python/ray/tests/test_bundle_label_selector.py new file mode 100644 index 000000000000..890fa2845dcc --- /dev/null +++ b/python/ray/tests/test_bundle_label_selector.py @@ -0,0 +1,137 @@ +import sys +import os + +import pytest + +import ray + +from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy +from ray._private.test_utils import placement_group_assert_no_leak + + +def test_bundle_label_selector_with_repeated_labels(ray_start_cluster): + cluster = ray_start_cluster + num_nodes = 2 + for _ in range(num_nodes): + cluster.add_node(num_cpus=4, labels={"ray.io/accelerator-type": "A100"}) + ray.init(address=cluster.address) + + bundles = [{"CPU": 1}, {"CPU": 1}] + label_selector = [{"ray.io/accelerator-type": "A100"}] * 2 + + placement_group = ray.util.placement_group( + name="repeated_labels_pg", + strategy="PACK", + bundles=bundles, + bundle_label_selector=label_selector, + ) + ray.get(placement_group.ready()) + + placement_group_assert_no_leak([placement_group]) + + +def test_unschedulable_bundle_label_selector(ray_start_cluster): + cluster = ray_start_cluster + cluster.add_node(num_cpus=1, labels={"ray.io/accelerator-type": "A100"}) + cluster.add_node(num_cpus=1, labels={"ray.io/accelerator-type": "TPU"}) + ray.init(address=cluster.address) + + # request 2 CPUs total, but only 1 CPU available with label ray.io/accelerator-type=A100 + bundles = [{"CPU": 1}, {"CPU": 1}] + label_selector = [{"ray.io/accelerator-type": "A100"}] * 2 + + placement_group = ray.util.placement_group( + name="unschedulable_labels_pg", + strategy="STRICT_PACK", + bundles=bundles, + bundle_label_selector=label_selector, + ) + + with pytest.raises(ray.exceptions.GetTimeoutError): + ray.get(placement_group.ready(), timeout=3) + + state = ray.util.placement_group_table()[placement_group.id.hex()]["stats"][ + "scheduling_state" + ] + assert state == "INFEASIBLE" + + +def test_bundle_label_selectors_match_bundle_resources(ray_start_cluster): + cluster = ray_start_cluster + + # Add nodes with unique labels and custom resources + cluster.add_node( + num_cpus=1, resources={"resource-0": 1}, labels={"region": "us-west4"} + ) + cluster.add_node( + num_cpus=1, resources={"resource-1": 1}, labels={"region": "us-east5"} + ) + cluster.add_node( + num_cpus=1, resources={"resource-2": 1}, labels={"region": "us-central2"} + ) + cluster.wait_for_nodes() + + ray.init(address=cluster.address) + + # Bundle label selectors to match the node labels above + bundle_label_selectors = [ + {"region": "us-west4"}, + {"region": "us-east5"}, + {"region": "us-central2"}, + ] + + # Each bundle requests CPU and a unique custom resource + bundles = [ + {"CPU": 1, "resource-0": 1}, + {"CPU": 1, "resource-1": 1}, + {"CPU": 1, "resource-2": 1}, + ] + + pg = ray.util.placement_group( + name="label_selectors_match_resources", + strategy="SPREAD", + bundles=bundles, + bundle_label_selector=bundle_label_selectors, + ) + ray.get(pg.ready()) + + @ray.remote + def get_assigned_resources(): + return ( + ray.get_runtime_context().get_node_id(), + ray.get_runtime_context().get_assigned_resources(), + ) + + node_id_to_label = { + node["NodeID"]: node["Labels"]["region"] for node in ray.nodes() + } + + # Launch one task per bundle to check resource mapping + for i in range(len(bundles)): + result = ray.get( + get_assigned_resources.options( + num_cpus=1, + resources={f"resource-{i}": 1}, + scheduling_strategy=PlacementGroupSchedulingStrategy( + placement_group=pg, placement_group_bundle_index=i + ), + ).remote() + ) + node_id, assigned = result + + # Check node label matches expected + assert node_id_to_label[node_id] == bundle_label_selectors[i]["region"] + + # Check resource assignment includes the expected custom resource + assert f"resource-{i}" in assigned + assert assigned[f"resource-{i}"] == 1.0 + + # Check CPU was assigned + assert "CPU" in assigned and assigned["CPU"] == 1.0 + + +if __name__ == "__main__": + if os.environ.get("PARALLEL_CI"): + sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__])) + else: + sys.exit(pytest.main(["-sv", __file__])) diff --git a/python/ray/tests/test_label_utils.py b/python/ray/tests/test_label_utils.py index cf653b6e21d4..3ec6d037bbc1 100644 --- a/python/ray/tests/test_label_utils.py +++ b/python/ray/tests/test_label_utils.py @@ -14,6 +14,7 @@ validate_node_labels, validate_label_key, validate_label_value, + validate_label_selector, validate_label_selector_value, validate_node_label_syntax, ) @@ -230,6 +231,31 @@ def test_validate_label_value(value, should_raise, expected_message): validate_label_value(value) +@pytest.mark.parametrize( + "label_selector, expected_error", + [ + (None, None), # Valid: No input provided + ({"region": "us-west4"}, None), # Valid label key and value + ({"ray.io/accelerator-type": "A100"}, None), # Valid label key and value + ({"": "valid-value"}, "Invalid label key name"), # Invalid label key (empty) + ( + {"!-invalidkey": "valid-value"}, + "Invalid label key name", + ), # Invalid label key syntax + ( + {"valid-key": "a" * 64}, + "Invalid label selector value", + ), # Invalid label value syntax + ], +) +def test_validate_label_selector(label_selector, expected_error): + result = validate_label_selector(label_selector) + if expected_error: + assert expected_error in result + else: + assert result is None + + @pytest.mark.parametrize( "selector, expected_error", [ diff --git a/python/ray/tests/test_placement_group.py b/python/ray/tests/test_placement_group.py index 184dc9684ac0..749f0eb016d5 100644 --- a/python/ray/tests/test_placement_group.py +++ b/python/ray/tests/test_placement_group.py @@ -12,6 +12,7 @@ from ray.util.placement_group import ( validate_placement_group, _validate_bundles, + _validate_bundle_label_selector, VALID_PLACEMENT_GROUP_STRATEGIES, ) @@ -703,6 +704,37 @@ def test_bundle_validation(self): with pytest.raises(ValueError, match="only 0 values"): _validate_bundles([{"CPU": 0, "GPU": 0}]) + def test_bundle_label_selector_validation(self): + """Test _validate_bundle_label_selector().""" + + # Valid label selector list should not raise an exception. + valid_label_selectors = [ + {"ray.io/market_type": "spot"}, + {"ray.io/accelerator-type": "A100"}, + ] + _validate_bundle_label_selector(valid_label_selectors) + + # Non-list input should raise an exception. + with pytest.raises(ValueError, match="must be a list"): + _validate_bundle_label_selector("not a list") + + # Empty list should not raise (interpreted as no-op). + _validate_bundle_label_selector([]) + + # List with non-dictionary elements should raise an exception. + with pytest.raises(ValueError, match="must be a list of string dictionary"): + _validate_bundle_label_selector(["not a dict", {"valid": "label"}]) + + # Dictionary with non-string keys or values should raise an exception. + with pytest.raises(ValueError, match="must be a list of string dictionary"): + _validate_bundle_label_selector([{1: "value"}, {"key": "val"}]) + with pytest.raises(ValueError, match="must be a list of string dictionary"): + _validate_bundle_label_selector([{"key": 123}, {"valid": "label"}]) + + # Invalid label key or value syntax (delegated to validate_label_selector). + with pytest.raises(ValueError, match="Invalid label selector provided"): + _validate_bundle_label_selector([{"INVALID key!": "value"}]) + if __name__ == "__main__": sys.exit(pytest.main(["-sv", __file__])) diff --git a/python/ray/util/placement_group.py b/python/ray/util/placement_group.py index 21ca50176d75..27d4b6bda8ab 100644 --- a/python/ray/util/placement_group.py +++ b/python/ray/util/placement_group.py @@ -9,6 +9,7 @@ from ray.util.annotations import DeveloperAPI, PublicAPI from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy import ray._private.ray_constants as ray_constants +from ray._private.label_utils import validate_label_selector bundle_reservation_check = None @@ -149,6 +150,7 @@ def placement_group( lifetime: Optional[str] = None, _max_cpu_fraction_per_node: float = 1.0, _soft_target_node_id: Optional[str] = None, + bundle_label_selector: List[Dict[str, str]] = None, ) -> PlacementGroup: """Asynchronously creates a PlacementGroup. @@ -182,6 +184,8 @@ def placement_group( If the target node has no available resources or died, bundles can be placed elsewhere. This currently only works with STRICT_PACK pg. + bundle_label_selector: A list of label selectors to apply to a + placement group on a per-bundle level. Raises: ValueError: if bundle type is not a list. @@ -201,8 +205,12 @@ def placement_group( lifetime=lifetime, _max_cpu_fraction_per_node=_max_cpu_fraction_per_node, _soft_target_node_id=_soft_target_node_id, + bundle_label_selector=bundle_label_selector, ) + if bundle_label_selector is None: + bundle_label_selector = [] + if lifetime == "detached": detached = True else: @@ -215,6 +223,7 @@ def placement_group( detached, _max_cpu_fraction_per_node, _soft_target_node_id, + bundle_label_selector, ) return PlacementGroup(placement_group_id) @@ -347,6 +356,7 @@ def validate_placement_group( lifetime: Optional[str] = None, _max_cpu_fraction_per_node: float = 1.0, _soft_target_node_id: Optional[str] = None, + bundle_label_selector: List[Dict[str, str]] = None, ) -> bool: """Validates inputs for placement_group. @@ -375,6 +385,14 @@ def validate_placement_group( _validate_bundles(bundles) + if bundle_label_selector is not None: + if len(bundles) != len(bundle_label_selector): + raise ValueError( + f"Invalid bundle label selector {bundle_label_selector}. " + f"The length of `bundle_label_selector` should equal the length of `bundles`." + ) + _validate_bundle_label_selector(bundle_label_selector) + if strategy not in VALID_PLACEMENT_GROUP_STRATEGIES: raise ValueError( f"Invalid placement group strategy {strategy}. " @@ -435,6 +453,39 @@ def _validate_bundles(bundles: List[Dict[str, float]]): ) +def _validate_bundle_label_selector(bundle_label_selector: List[Dict[str, str]]): + """Validates each label selector and raises a ValueError if any label selector is invalid.""" + + if not isinstance(bundle_label_selector, list): + raise ValueError( + "Placement group bundle_label_selector must be a list, " + f"got {type(bundle_label_selector)}." + ) + + if len(bundle_label_selector) == 0: + # No label selectors provided, no-op. + return + + for label_selector in bundle_label_selector: + if ( + not isinstance(label_selector, dict) + or not all(isinstance(k, str) for k in label_selector.keys()) + or not all(isinstance(v, str) for v in label_selector.values()) + ): + raise ValueError( + "Bundle label selector must be a list of string dictionary" + " label selectors. For example: " + '`[{ray.io/market_type": "spot"}, {"ray.io/accelerator-type": "A100"}]`.' + ) + # Call helper function to validate label selector key-value syntax. + error_message = validate_label_selector(label_selector) + if error_message: + raise ValueError( + f"Invalid label selector provided in bundle_label_selector list." + f" Detailed error: '{error_message}'" + ) + + def _valid_resource_shape(resources, bundle_specs): """ If the resource shape cannot fit into every diff --git a/src/ray/common/bundle_spec.cc b/src/ray/common/bundle_spec.cc index 7624919f273e..111765363b63 100644 --- a/src/ray/common/bundle_spec.cc +++ b/src/ray/common/bundle_spec.cc @@ -27,6 +27,10 @@ void BundleSpecification::ComputeResources() { } else { unit_resource_ = std::make_shared(ResourceMapToResourceRequest( unit_resource, /*requires_object_store_memory=*/false)); + + // Set LabelSelector required for scheduling this bundle if specified. + // Parses string map from proto to LabelSelector data type. + unit_resource_->SetLabelSelector(LabelSelector(message_->label_selector())); } // Generate placement group bundle labels. diff --git a/src/ray/common/placement_group.h b/src/ray/common/placement_group.h index 223ec9afc5ea..e20776e3aa5b 100644 --- a/src/ray/common/placement_group.h +++ b/src/ray/common/placement_group.h @@ -88,7 +88,9 @@ class PlacementGroupSpecBuilder { NodeID soft_target_node_id, const JobID &creator_job_id, const ActorID &creator_actor_id, - bool is_creator_detached_actor) { + bool is_creator_detached_actor, + const std::vector> + &bundle_label_selector = {}) { message_->set_placement_group_id(placement_group_id.Binary()); message_->set_name(name); message_->set_strategy(strategy); @@ -122,6 +124,13 @@ class PlacementGroupSpecBuilder { mutable_unit_resources->insert({current->first, current->second}); } } + // Set the label selector for this bundle if provided in bundle_label_selector. + if (bundle_label_selector.size() > i) { + auto *mutable_label_selector = message_bundle->mutable_label_selector(); + for (const auto &pair : bundle_label_selector[i]) { + (*mutable_label_selector)[pair.first] = pair.second; + } + } } return *this; } diff --git a/src/ray/common/scheduling/cluster_resource_data.h b/src/ray/common/scheduling/cluster_resource_data.h index 66baa61be3bd..4991cb0c518b 100644 --- a/src/ray/common/scheduling/cluster_resource_data.h +++ b/src/ray/common/scheduling/cluster_resource_data.h @@ -61,8 +61,8 @@ class ResourceRequest { const LabelSelector &GetLabelSelector() const { return label_selector_; } - void SetLabelSelector(const LabelSelector &label_selector) { - label_selector_ = label_selector; + void SetLabelSelector(LabelSelector label_selector) { + label_selector_ = std::move(label_selector); } FixedPoint Get(ResourceID resource_id) const { return resources_.Get(resource_id); } diff --git a/src/ray/core_worker/common.h b/src/ray/core_worker/common.h index 039fb5b063df..62ca3b5cbb05 100644 --- a/src/ray/core_worker/common.h +++ b/src/ray/core_worker/common.h @@ -216,13 +216,16 @@ struct PlacementGroupCreationOptions { std::vector> bundles, bool is_detached_p, double max_cpu_fraction_per_node, - NodeID soft_target_node_id = NodeID::Nil()) + NodeID soft_target_node_id = NodeID::Nil(), + std::vector> bundle_label_selector = + {}) : name(std::move(name)), strategy(strategy), bundles(std::move(bundles)), is_detached(is_detached_p), max_cpu_fraction_per_node(max_cpu_fraction_per_node), - soft_target_node_id(soft_target_node_id) { + soft_target_node_id(soft_target_node_id), + bundle_label_selector(std::move(bundle_label_selector)) { RAY_CHECK(soft_target_node_id.IsNil() || strategy == PlacementStrategy::STRICT_PACK) << "soft_target_node_id only works with STRICT_PACK now"; } @@ -243,6 +246,8 @@ struct PlacementGroupCreationOptions { /// Nil means there is no target node. /// This only applies to STRICT_PACK pg. const NodeID soft_target_node_id; + /// The label selectors to apply per-bundle in this placement group. + const std::vector> bundle_label_selector; }; class ObjectLocation { diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 5291046ce9d9..9d8d3c84d21c 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -2782,7 +2782,8 @@ Status CoreWorker::CreatePlacementGroup( placement_group_creation_options.soft_target_node_id, worker_context_.GetCurrentJobID(), worker_context_.GetCurrentActorID(), - worker_context_.CurrentActorDetached()); + worker_context_.CurrentActorDetached(), + placement_group_creation_options.bundle_label_selector); PlacementGroupSpecification placement_group_spec = builder.Build(); *return_placement_group_id = placement_group_id; RAY_LOG(INFO).WithField(placement_group_id) diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index e8e779e9e8f1..81a12b8a3a6a 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -608,6 +608,10 @@ message TaskInfoEntry { optional string call_site = 27; } +message LabelSelector { + map label_selector_dict = 1; +} + message TaskAttempt { // The task id of the task attempt. bytes task_id = 1; @@ -624,6 +628,9 @@ message Bundle { map unit_resources = 2; // The location of this bundle. bytes node_id = 3; + // Label selector associated with this bundle. + // Populated from bundle_label_selector if provided. + map label_selector = 4; } message PlacementGroupSpec {