Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions python/ray/_private/label_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,22 @@ def validate_label_value(value: str):
)


def validate_label_selector(label_selector: Optional[Dict[str, str]]) -> Optional[str]:
if label_selector is None:
return None

for key, value in label_selector.items():
possible_error_message = validate_label_key(key)
if possible_error_message:
return possible_error_message
if value is not None:
possible_error_message = validate_label_selector_value(value)
if possible_error_message:
return possible_error_message

return None


def validate_label_selector_value(selector: str) -> Optional[str]:
if selector == "":
return None
Expand Down
21 changes: 2 additions & 19 deletions python/ray/_private/ray_option_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@
import ray
from ray._private import ray_constants
from ray._private.label_utils import (
validate_label_key,
validate_label_selector_value,
validate_label_selector,
)
from ray._private.utils import get_ray_doc_version
from ray.util.placement_group import PlacementGroup
Expand Down Expand Up @@ -123,24 +122,8 @@ def _validate_resources(resources: Optional[Dict[str, float]]) -> Optional[str]:
return None


def _validate_label_selector(label_selector: Optional[Dict[str, str]]) -> Optional[str]:
if label_selector is None:
return None

for key, value in label_selector.items():
possible_error_message = validate_label_key(key)
if possible_error_message:
return possible_error_message
if value is not None:
possible_error_message = validate_label_selector_value(value)
if possible_error_message:
return possible_error_message

return None


_common_options = {
"label_selector": Option((dict, type(None)), lambda x: _validate_label_selector(x)),
"label_selector": Option((dict, type(None)), lambda x: validate_label_selector(x)),
"accelerator_type": Option((str, type(None))),
"memory": _resource_option("memory"),
"name": Option((str, type(None))),
Expand Down
6 changes: 4 additions & 2 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -3852,7 +3852,8 @@ cdef class CoreWorker:
c_string strategy,
c_bool is_detached,
double max_cpu_fraction_per_node,
soft_target_node_id):
soft_target_node_id,
c_vector[unordered_map[c_string, c_string]] bundle_label_selector):
cdef:
CPlacementGroupID c_placement_group_id
CPlacementStrategy c_strategy
Expand Down Expand Up @@ -3883,7 +3884,8 @@ cdef class CoreWorker:
bundles,
is_detached,
max_cpu_fraction_per_node,
c_soft_target_node_id),
c_soft_target_node_id,
bundle_label_selector),
&c_placement_group_id))

return PlacementGroupID(c_placement_group_id.Binary())
Expand Down
1 change: 1 addition & 0 deletions python/ray/includes/common.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ cdef extern from "ray/core_worker/common.h" nogil:
c_bool is_detached,
double max_cpu_fraction_per_node,
CNodeID soft_target_node_id,
const c_vector[unordered_map[c_string, c_string]] &bundle_label_selector,
)

cdef cppclass CObjectLocation "ray::core::ObjectLocation":
Expand Down
1 change: 1 addition & 0 deletions python/ray/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ py_test_module_list(
py_test_module_list(
size = "medium",
files = [
"test_bundle_label_selector.py",
"test_label_scheduling.py",
"test_minimal_install.py",
"test_path_utils.py",
Expand Down
137 changes: 137 additions & 0 deletions python/ray/tests/test_bundle_label_selector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
import sys
import os

import pytest

import ray

from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
from ray._private.test_utils import placement_group_assert_no_leak


def test_bundle_label_selector_with_repeated_labels(ray_start_cluster):
cluster = ray_start_cluster
num_nodes = 2
for _ in range(num_nodes):
cluster.add_node(num_cpus=4, labels={"ray.io/accelerator-type": "A100"})
ray.init(address=cluster.address)

bundles = [{"CPU": 1}, {"CPU": 1}]
label_selector = [{"ray.io/accelerator-type": "A100"}] * 2

placement_group = ray.util.placement_group(
name="repeated_labels_pg",
strategy="PACK",
bundles=bundles,
bundle_label_selector=label_selector,
)
ray.get(placement_group.ready())

placement_group_assert_no_leak([placement_group])


def test_unschedulable_bundle_label_selector(ray_start_cluster):
cluster = ray_start_cluster
cluster.add_node(num_cpus=1, labels={"ray.io/accelerator-type": "A100"})
cluster.add_node(num_cpus=1, labels={"ray.io/accelerator-type": "TPU"})
ray.init(address=cluster.address)

# request 2 CPUs total, but only 1 CPU available with label ray.io/accelerator-type=A100
bundles = [{"CPU": 1}, {"CPU": 1}]
label_selector = [{"ray.io/accelerator-type": "A100"}] * 2

placement_group = ray.util.placement_group(
name="unschedulable_labels_pg",
strategy="STRICT_PACK",
bundles=bundles,
bundle_label_selector=label_selector,
)

with pytest.raises(ray.exceptions.GetTimeoutError):
ray.get(placement_group.ready(), timeout=3)

state = ray.util.placement_group_table()[placement_group.id.hex()]["stats"][
"scheduling_state"
]
assert state == "INFEASIBLE"


def test_bundle_label_selectors_match_bundle_resources(ray_start_cluster):
cluster = ray_start_cluster

# Add nodes with unique labels and custom resources
cluster.add_node(
num_cpus=1, resources={"resource-0": 1}, labels={"region": "us-west4"}
)
cluster.add_node(
num_cpus=1, resources={"resource-1": 1}, labels={"region": "us-east5"}
)
cluster.add_node(
num_cpus=1, resources={"resource-2": 1}, labels={"region": "us-central2"}
)
cluster.wait_for_nodes()

ray.init(address=cluster.address)

# Bundle label selectors to match the node labels above
bundle_label_selectors = [
{"region": "us-west4"},
{"region": "us-east5"},
{"region": "us-central2"},
]

# Each bundle requests CPU and a unique custom resource
bundles = [
{"CPU": 1, "resource-0": 1},
{"CPU": 1, "resource-1": 1},
{"CPU": 1, "resource-2": 1},
]

pg = ray.util.placement_group(
name="label_selectors_match_resources",
strategy="SPREAD",
bundles=bundles,
bundle_label_selector=bundle_label_selectors,
)
ray.get(pg.ready())

@ray.remote
def get_assigned_resources():
return (
ray.get_runtime_context().get_node_id(),
ray.get_runtime_context().get_assigned_resources(),
)

node_id_to_label = {
node["NodeID"]: node["Labels"]["region"] for node in ray.nodes()
}

# Launch one task per bundle to check resource mapping
for i in range(len(bundles)):
result = ray.get(
get_assigned_resources.options(
num_cpus=1,
resources={f"resource-{i}": 1},
scheduling_strategy=PlacementGroupSchedulingStrategy(
placement_group=pg, placement_group_bundle_index=i
),
).remote()
)
node_id, assigned = result

# Check node label matches expected
assert node_id_to_label[node_id] == bundle_label_selectors[i]["region"]

# Check resource assignment includes the expected custom resource
assert f"resource-{i}" in assigned
assert assigned[f"resource-{i}"] == 1.0

# Check CPU was assigned
assert "CPU" in assigned and assigned["CPU"] == 1.0


if __name__ == "__main__":
if os.environ.get("PARALLEL_CI"):
sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__]))
else:
sys.exit(pytest.main(["-sv", __file__]))
26 changes: 26 additions & 0 deletions python/ray/tests/test_label_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
validate_node_labels,
validate_label_key,
validate_label_value,
validate_label_selector,
validate_label_selector_value,
validate_node_label_syntax,
)
Expand Down Expand Up @@ -230,6 +231,31 @@ def test_validate_label_value(value, should_raise, expected_message):
validate_label_value(value)


@pytest.mark.parametrize(
"label_selector, expected_error",
[
(None, None), # Valid: No input provided
({"region": "us-west4"}, None), # Valid label key and value
({"ray.io/accelerator-type": "A100"}, None), # Valid label key and value
({"": "valid-value"}, "Invalid label key name"), # Invalid label key (empty)
(
{"!-invalidkey": "valid-value"},
"Invalid label key name",
), # Invalid label key syntax
(
{"valid-key": "a" * 64},
"Invalid label selector value",
), # Invalid label value syntax
],
)
def test_validate_label_selector(label_selector, expected_error):
result = validate_label_selector(label_selector)
if expected_error:
assert expected_error in result
else:
assert result is None


@pytest.mark.parametrize(
"selector, expected_error",
[
Expand Down
32 changes: 32 additions & 0 deletions python/ray/tests/test_placement_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from ray.util.placement_group import (
validate_placement_group,
_validate_bundles,
_validate_bundle_label_selector,
VALID_PLACEMENT_GROUP_STRATEGIES,
)

Expand Down Expand Up @@ -703,6 +704,37 @@ def test_bundle_validation(self):
with pytest.raises(ValueError, match="only 0 values"):
_validate_bundles([{"CPU": 0, "GPU": 0}])

def test_bundle_label_selector_validation(self):
"""Test _validate_bundle_label_selector()."""

# Valid label selector list should not raise an exception.
valid_label_selectors = [
{"ray.io/market_type": "spot"},
{"ray.io/accelerator-type": "A100"},
]
_validate_bundle_label_selector(valid_label_selectors)

# Non-list input should raise an exception.
with pytest.raises(ValueError, match="must be a list"):
_validate_bundle_label_selector("not a list")

# Empty list should not raise (interpreted as no-op).
_validate_bundle_label_selector([])

# List with non-dictionary elements should raise an exception.
with pytest.raises(ValueError, match="must be a list of string dictionary"):
_validate_bundle_label_selector(["not a dict", {"valid": "label"}])

# Dictionary with non-string keys or values should raise an exception.
with pytest.raises(ValueError, match="must be a list of string dictionary"):
_validate_bundle_label_selector([{1: "value"}, {"key": "val"}])
with pytest.raises(ValueError, match="must be a list of string dictionary"):
_validate_bundle_label_selector([{"key": 123}, {"valid": "label"}])

# Invalid label key or value syntax (delegated to validate_label_selector).
with pytest.raises(ValueError, match="Invalid label selector provided"):
_validate_bundle_label_selector([{"INVALID key!": "value"}])


if __name__ == "__main__":
sys.exit(pytest.main(["-sv", __file__]))
Loading