Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
1233494
Pass node labels from gcs to autoscaler and use label_selector in com…
ryanaoleary Jun 5, 2025
94b61fb
Update test
ryanaoleary Jun 5, 2025
247af51
Fix pg_usage_labels test
ryanaoleary Jun 13, 2025
e603c00
Merge branch 'master' into autoscaler-labels
ryanaoleary Jun 13, 2025
f991648
Propogate labelselector info
ryanaoleary Jun 24, 2025
b51d643
Pass label selector info and add/update tests
ryanaoleary Jun 25, 2025
161a8a4
Merge branch 'master' into autoscaler-labels
ryanaoleary Jun 25, 2025
9d00796
Update c++ test to check all operators
ryanaoleary Jun 25, 2025
f6528bb
clang format
ryanaoleary Jun 25, 2025
8adaa0e
Merge branch 'master' into autoscaler-labels
ryanaoleary Jun 25, 2025
7d593ea
Fix c++ test
ryanaoleary Jun 30, 2025
bb9420f
Move LabelSelector message to common proto and add scheduling descriptor
ryanaoleary Jul 2, 2025
8cb1816
Fix scheduling logic and add e2e test
ryanaoleary Jul 9, 2025
18aa109
Fix remaining comments
ryanaoleary Jul 9, 2025
ece9c9d
Merge branch 'master' into autoscaler-labels
ryanaoleary Jul 9, 2025
55c94f0
Fix FillAggregateLoad function
ryanaoleary Jul 9, 2025
9871164
Fix microcheck
ryanaoleary Jul 9, 2025
509c29a
Update python/ray/autoscaler/v2/utils.py
ryanaoleary Jul 14, 2025
72aa01d
Make label score first priority
ryanaoleary Jul 14, 2025
d84b113
Merge branch 'master' into autoscaler-labels
ryanaoleary Jul 15, 2025
202717b
Update label selector score comment
ryanaoleary Jul 16, 2025
727486b
Add tests for fallback and scoring, and add label selector hashing
ryanaoleary Jul 21, 2025
c0a3f2c
Merge branch 'master' into autoscaler-labels
ryanaoleary Jul 23, 2025
c1a0eb4
Fix lint
ryanaoleary Jul 24, 2025
0e8bf9f
Undo pydoclint change
ryanaoleary Jul 24, 2025
20d0737
Update src/ray/common/scheduling/label_selector.h
ryanaoleary Jul 29, 2025
f799d11
Update src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.cc
ryanaoleary Jul 29, 2025
49e1ef4
Update src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.cc
ryanaoleary Jul 29, 2025
8a06698
Update src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.h
ryanaoleary Jul 29, 2025
65266a2
Update src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.cc
ryanaoleary Jul 29, 2025
d0270ca
Update src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.cc
ryanaoleary Jul 29, 2025
7a04a3e
Update src/ray/gcs/gcs_server/gcs_resource_manager.cc
ryanaoleary Jul 29, 2025
9b3bbbe
Remove friend tests, use absl hash, fix nit comments
ryanaoleary Jul 29, 2025
1af13a1
Use move() and reserve vector length
ryanaoleary Jul 29, 2025
701b97a
remove erroneous duplicate for loop
ryanaoleary Jul 29, 2025
c1a302c
Remove pydoclint baseline change
ryanaoleary Jul 29, 2025
c8bc79a
Merge branch 'master' into autoscaler-labels
ryanaoleary Jul 29, 2025
a4ee6c4
Merge branch 'master' into autoscaler-labels
ryanaoleary Jul 29, 2025
4741c40
Fix pdyoclint-baseline
ryanaoleary Jul 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 76 additions & 3 deletions python/ray/autoscaler/v2/scheduler.py
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I think we should also change the _sort_resource_request logic in _try_schedule to add labels to the sorting mechanism, as there can resource requests with exact same resource requirements but different label selectors

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me, done in 8cb1816. I had it sort by the length of the constraints of the first selector, similar to how we sort based on the length of the placement constraints.

Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from ray.core.generated.autoscaler_pb2 import (
ClusterResourceConstraint,
GangResourceRequest,
LabelOperator,
ResourceRequest,
ResourceRequestByCount,
)
Expand Down Expand Up @@ -275,8 +276,10 @@ def new(
# Available resources for scheduling requests of different
# sources.
available_resources=dict(instance.ray_node.available_resources),
# Use ray node's dynamic labels.
labels=dict(instance.ray_node.dynamic_labels),
labels={
**(instance.ray_node.labels or {}),
**(instance.ray_node.dynamic_labels or {}),
},
status=SchedulingNodeStatus.SCHEDULABLE,
im_instance_id=instance.im_instance.instance_id,
im_instance_status=instance.im_instance.status,
Expand Down Expand Up @@ -437,7 +440,7 @@ def _compute_score(
A "higher" score means that this node is more suitable for scheduling the
current scheduled resource requests.

The score is a tuple of 4 values:
The score is a tuple of 5 values:
1. Whether this node is a GPU node and the current resource request has
GPU requirements:
0: if this node is a GPU node and the current resource request
Expand All @@ -447,6 +450,11 @@ def _compute_score(
2. The number of resource types being scheduled.
3. The minimum utilization rate across all resource types.
4. The average utilization rate across all resource types.
5. Whether this node has labels matching the current resource request's
label_selector requirements:
0: if this node does not satisfy the label_selector requirements.
1: if this node satisfies the label_selector requirements (or no
requirements provided).

NOTE:
This function is adapted from _resource_based_utilization_scorer from
Expand Down Expand Up @@ -499,6 +507,9 @@ def _compute_score(
if is_gpu_node and not any_gpu_requests:
gpu_ok = False

# Check if node satisfies label requirements.
matches_labels = self._satisfies_label_constraints(sched_requests)

# Prioritize avoiding gpu nodes for non-gpu workloads first,
# then prioritize matching multiple resource types,
# then prioritize using all resources,
Expand All @@ -510,8 +521,40 @@ def _compute_score(
float(sum(util_by_resources)) / len(util_by_resources)
if util_by_resources
else 0,
matches_labels,
)

def _satisfies_label_constraints(
self, sched_requests: List[ResourceRequest]
) -> int:
"""Returns 1 if this node satisfies at least one label selector, 0 otherwise."""
for req in sched_requests:
for selector in req.label_selectors:
# A label selector passes only if all constraints are satisfied
all_constraints_pass = True
for constraint in selector.label_constraints:
key = constraint.label_key
values = set(constraint.label_values)
op = constraint.operator
node_val = self.labels.get(key)

if op == LabelOperator.LABEL_OPERATOR_IN:
if node_val not in values:
all_constraints_pass = False
break
elif op == LabelOperator.LABEL_OPERATOR_NOT_IN:
if node_val in values:
all_constraints_pass = False
break
else:
all_constraints_pass = False
break

if all_constraints_pass:
return 1 # One label selector matched

return 0 # No label selectors are satisfied by SchedulingNode

def _try_schedule_one(
self, request: ResourceRequest, resource_request_source: ResourceRequestSource
) -> bool:
Expand All @@ -528,6 +571,36 @@ def _try_schedule_one(
True if the resource request is scheduled on this node.
"""

# Enforce label selector constraints
if request.label_selectors:
selector_satisfied = False
for selector in request.label_selectors:
all_constraints_pass = True
for constraint in selector.label_constraints:
key = constraint.label_key
values = set(constraint.label_values)
op = constraint.operator
node_val = self.labels.get(key)

if op == LabelOperator.LABEL_OPERATOR_IN:
if node_val not in values:
all_constraints_pass = False
break
elif op == LabelOperator.LABEL_OPERATOR_NOT_IN:
if node_val in values:
all_constraints_pass = False
break
else:
all_constraints_pass = False
break

if all_constraints_pass:
selector_satisfied = True
break # At least one selector matched

if not selector_satisfied:
return False # Node doesn't satisfy any label selector

# Check if there's placement constraints that are not satisfied.
for constraint in request.placement_constraints:
if constraint.HasField("anti_affinity"):
Expand Down
197 changes: 179 additions & 18 deletions python/ray/autoscaler/v2/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from ray.core.generated.autoscaler_pb2 import (
ClusterResourceConstraint,
GangResourceRequest,
LabelOperator,
NodeState,
NodeStatus,
ResourceRequest,
Expand Down Expand Up @@ -1930,64 +1931,67 @@ def try_schedule(node_resources: Dict, requests: List[Dict]) -> Tuple:
infeasible, score = node.try_schedule(requests, source)
return ResourceRequestUtil.to_resource_maps(infeasible), score

assert try_schedule({"CPU": 1}, [{"CPU": 1}]) == ([], (True, 1, 1.0, 1.0))
assert try_schedule({"CPU": 1}, [{"CPU": 1}]) == ([], (True, 1, 1.0, 1.0, 0))

assert try_schedule({"GPU": 4}, [{"GPU": 2}]) == ([], (True, 1, 0.5, 0.5))
assert try_schedule({"GPU": 4}, [{"GPU": 2}]) == ([], (True, 1, 0.5, 0.5, 0))
assert try_schedule({"GPU": 4}, [{"GPU": 1}, {"GPU": 1}]) == (
[],
(True, 1, 0.5, 0.5),
(True, 1, 0.5, 0.5, 0),
)
assert try_schedule({"GPU": 2}, [{"GPU": 2}]) == ([], (True, 1, 2, 2, 0))
assert try_schedule({"GPU": 2}, [{"GPU": 1}, {"GPU": 1}]) == (
[],
(True, 1, 2, 2, 0),
)
assert try_schedule({"GPU": 2}, [{"GPU": 2}]) == ([], (True, 1, 2, 2))
assert try_schedule({"GPU": 2}, [{"GPU": 1}, {"GPU": 1}]) == ([], (True, 1, 2, 2))
assert try_schedule({"GPU": 1}, [{"GPU": 1, "CPU": 1}, {"GPU": 1}]) == (
[{"GPU": 1, "CPU": 1}],
(True, 1, 1, 1),
(True, 1, 1, 1, 0),
)
assert try_schedule({"GPU": 1, "CPU": 1}, [{"GPU": 1, "CPU": 1}, {"GPU": 1}]) == (
[{"GPU": 1}],
(True, 2, 1, 1),
(True, 2, 1, 1, 0),
)
assert try_schedule({"GPU": 2, "TPU": 1}, [{"GPU": 2}]) == ([], (True, 1, 0, 1))
assert try_schedule({"CPU": 64}, [{"CPU": 64}]) == ([], (True, 1, 64, 64))
assert try_schedule({"CPU": 64}, [{"CPU": 32}]) == ([], (True, 1, 8, 8))
assert try_schedule({"GPU": 2, "TPU": 1}, [{"GPU": 2}]) == ([], (True, 1, 0, 1, 0))
assert try_schedule({"CPU": 64}, [{"CPU": 64}]) == ([], (True, 1, 64, 64, 0))
assert try_schedule({"CPU": 64}, [{"CPU": 32}]) == ([], (True, 1, 8, 8, 0))
assert try_schedule({"CPU": 64}, [{"CPU": 16}, {"CPU": 16}]) == (
[],
(True, 1, 8, 8),
(True, 1, 8, 8, 0),
)

# GPU Scores
assert try_schedule({"GPU": 1, "CPU": 1}, [{"CPU": 1}]) == (
[],
(False, 1, 0.0, 0.5),
(False, 1, 0.0, 0.5, 0),
)
assert try_schedule({"GPU": 1, "CPU": 1}, [{"CPU": 1, "GPU": 1}]) == (
[],
(True, 2, 1.0, 1.0),
(True, 2, 1.0, 1.0, 0),
)
assert try_schedule({"GPU": 1, "CPU": 1}, [{"GPU": 1}]) == (
[],
(True, 1, 0.0, 0.5),
(True, 1, 0.0, 0.5, 0),
)

# Zero resources
assert try_schedule({"CPU": 0, "custom": 1}, [{"custom": 1}]) == (
[],
(True, 1, 1, 1),
(True, 1, 1, 1, 0),
)
assert try_schedule({"CPU": 0, "custom": 1}, [{"CPU": 1}]) == (
[{"CPU": 1}],
(True, 0, 0.0, 0.0),
(True, 0, 0.0, 0.0, 0),
)

# Implicit resources
implicit_resource = ray._raylet.IMPLICIT_RESOURCE_PREFIX + "a"
assert try_schedule({"CPU": 1}, [{implicit_resource: 1}]) == (
[],
(True, 0, 0.0, 0.0),
(True, 0, 0.0, 0.0, 0),
)
assert try_schedule({"CPU": 1}, [{implicit_resource: 1}] * 2) == (
[{implicit_resource: 1}],
(True, 0, 0.0, 0.0),
(True, 0, 0.0, 0.0, 0),
)


Expand Down Expand Up @@ -2341,6 +2345,163 @@ def get_nodes_for(gang_resource_requests) -> Tuple[Dict, List[List[Dict]]]:
) == ({"p2.8xlarge": 1}, [])


def test_schedule_node_with_matching_labels():
"""
Test that a node with matching labels is considered schedulable and used to satisfy a request
with a label_selector.
"""
scheduler = ResourceDemandScheduler(event_logger)
node_type_configs = {
"labelled_node": NodeTypeConfig(
name="labelled_node",
resources={"CPU": 1},
min_worker_nodes=0,
max_worker_nodes=10,
labels={"accelerator": "A100"},
),
}

# The existing instance has matching dynamic label.
instance = make_autoscaler_instance(
im_instance=Instance(
instance_type="labelled_node",
status=Instance.RAY_RUNNING,
instance_id="1",
node_id=b"r-1",
),
ray_node=NodeState(
node_id=b"r-1",
ray_node_type_name="labelled_node",
available_resources={"CPU": 1},
total_resources={"CPU": 1},
labels={"accelerator": "A100"},
status=NodeStatus.RUNNING,
),
cloud_instance_id="c-1",
)

# No new nodes should be launched if the existing node satisfies the request.
resource_request = ResourceRequestUtil.make(
{"CPU": 1},
label_selectors=[[("accelerator", LabelOperator.LABEL_OPERATOR_IN, ["A100"])]],
)

request = sched_request(
node_type_configs=node_type_configs,
resource_requests=[resource_request],
instances=[instance],
)
reply = scheduler.schedule(request)
to_launch, _ = _launch_and_terminate(reply)
assert to_launch == {}


def test_scale_up_node_to_satisfy_labels():
"""
Test that a resource request with a label selector scales up a new node with
labels to satisfy the constraint.
"""
scheduler = ResourceDemandScheduler(event_logger)

node_type_configs = {
"tpu_node": NodeTypeConfig(
name="tpu_node",
resources={"CPU": 1},
labels={"accelerator": "TPU"},
min_worker_nodes=0,
max_worker_nodes=10,
),
"gpu_node": NodeTypeConfig(
name="gpu_node",
resources={"CPU": 1},
labels={"accelerator": "A100"},
min_worker_nodes=0,
max_worker_nodes=10,
),
}

# Request: want a node with label "accelerator: A100"
resource_request = ResourceRequestUtil.make(
{"CPU": 1},
label_selectors=[[("accelerator", LabelOperator.LABEL_OPERATOR_IN, ["A100"])]],
)

request = sched_request(
node_type_configs=node_type_configs,
resource_requests=[resource_request],
)

reply = scheduler.schedule(request)
to_launch, _ = _launch_and_terminate(reply)

assert to_launch == {"gpu_node": 1}


def test_pg_with_bundle_infeasible_label_selectors():
"""
Test that placement group scheduling honors bundle_label_selectors.
"""
scheduler = ResourceDemandScheduler(event_logger)
AFFINITY = ResourceRequestUtil.PlacementConstraintType.AFFINITY

node_type_configs = {
"gpu_node": NodeTypeConfig(
name="gpu_node",
resources={"CPU": 4, "GPU": 1},
min_worker_nodes=0,
max_worker_nodes=5,
labels={"accelerator": "A100"},
),
"tpu_node": NodeTypeConfig(
name="tpu_node",
resources={"CPU": 4},
min_worker_nodes=0,
max_worker_nodes=5,
labels={"accelerator": "TPU"},
),
}

# Create ResourceRequests for a placement group where each bundle has different label selectors
gpu_request = ResourceRequestUtil.make(
{"CPU": 2, "GPU": 1},
constraints=[(AFFINITY, "pg-1", "")],
label_selectors=[[("accelerator", LabelOperator.LABEL_OPERATOR_IN, ["A100"])]],
)
tpu_request = ResourceRequestUtil.make(
{"CPU": 2},
constraints=[(AFFINITY, "pg-1", "")],
label_selectors=[[("accelerator", LabelOperator.LABEL_OPERATOR_IN, ["TPU"])]],
)

request = sched_request(
node_type_configs=node_type_configs,
gang_resource_requests=[[gpu_request, tpu_request]],
)

reply = scheduler.schedule(request)
to_launch, _ = _launch_and_terminate(reply)

assert sorted(to_launch) == sorted({"gpu_node": 1, "tpu_node": 1})

# Both bundles require A100, but no node has enough resources -> infeasible
infeasbile_gpu_request = ResourceRequestUtil.make(
{"CPU": 3, "GPU": 1},
constraints=[(AFFINITY, "pg-2", "")],
label_selectors=[[("accelerator", LabelOperator.LABEL_OPERATOR_IN, ["A100"])]],
)

request = sched_request(
node_type_configs=node_type_configs,
gang_resource_requests=[[infeasbile_gpu_request, infeasbile_gpu_request]],
)

reply = scheduler.schedule(request)
to_launch, _ = _launch_and_terminate(reply)

assert to_launch == {}
assert len(reply.infeasible_gang_resource_requests) == 1


if __name__ == "__main__":
if os.environ.get("PARALLEL_CI"):
sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__]))
Expand Down
4 changes: 3 additions & 1 deletion python/ray/autoscaler/v2/tests/test_sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,9 @@ def verify():
state,
[
ExpectedNodeState(
head_node_id, NodeStatus.RUNNING, labels={f"_PG_{pg_id}": ""}
head_node_id,
NodeStatus.RUNNING,
labels={f"_PG_{pg_id}": ""},
),
],
)
Expand Down
Loading