Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
1233494
Pass node labels from gcs to autoscaler and use label_selector in com…
ryanaoleary Jun 5, 2025
94b61fb
Update test
ryanaoleary Jun 5, 2025
247af51
Fix pg_usage_labels test
ryanaoleary Jun 13, 2025
e603c00
Merge branch 'master' into autoscaler-labels
ryanaoleary Jun 13, 2025
f991648
Propogate labelselector info
ryanaoleary Jun 24, 2025
b51d643
Pass label selector info and add/update tests
ryanaoleary Jun 25, 2025
161a8a4
Merge branch 'master' into autoscaler-labels
ryanaoleary Jun 25, 2025
9d00796
Update c++ test to check all operators
ryanaoleary Jun 25, 2025
f6528bb
clang format
ryanaoleary Jun 25, 2025
8adaa0e
Merge branch 'master' into autoscaler-labels
ryanaoleary Jun 25, 2025
7d593ea
Fix c++ test
ryanaoleary Jun 30, 2025
bb9420f
Move LabelSelector message to common proto and add scheduling descriptor
ryanaoleary Jul 2, 2025
8cb1816
Fix scheduling logic and add e2e test
ryanaoleary Jul 9, 2025
18aa109
Fix remaining comments
ryanaoleary Jul 9, 2025
ece9c9d
Merge branch 'master' into autoscaler-labels
ryanaoleary Jul 9, 2025
55c94f0
Fix FillAggregateLoad function
ryanaoleary Jul 9, 2025
9871164
Fix microcheck
ryanaoleary Jul 9, 2025
509c29a
Update python/ray/autoscaler/v2/utils.py
ryanaoleary Jul 14, 2025
72aa01d
Make label score first priority
ryanaoleary Jul 14, 2025
d84b113
Merge branch 'master' into autoscaler-labels
ryanaoleary Jul 15, 2025
202717b
Update label selector score comment
ryanaoleary Jul 16, 2025
727486b
Add tests for fallback and scoring, and add label selector hashing
ryanaoleary Jul 21, 2025
c0a3f2c
Merge branch 'master' into autoscaler-labels
ryanaoleary Jul 23, 2025
c1a0eb4
Fix lint
ryanaoleary Jul 24, 2025
0e8bf9f
Undo pydoclint change
ryanaoleary Jul 24, 2025
20d0737
Update src/ray/common/scheduling/label_selector.h
ryanaoleary Jul 29, 2025
f799d11
Update src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.cc
ryanaoleary Jul 29, 2025
49e1ef4
Update src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.cc
ryanaoleary Jul 29, 2025
8a06698
Update src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.h
ryanaoleary Jul 29, 2025
65266a2
Update src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.cc
ryanaoleary Jul 29, 2025
d0270ca
Update src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.cc
ryanaoleary Jul 29, 2025
7a04a3e
Update src/ray/gcs/gcs_server/gcs_resource_manager.cc
ryanaoleary Jul 29, 2025
9b3bbbe
Remove friend tests, use absl hash, fix nit comments
ryanaoleary Jul 29, 2025
1af13a1
Use move() and reserve vector length
ryanaoleary Jul 29, 2025
701b97a
remove erroneous duplicate for loop
ryanaoleary Jul 29, 2025
c1a302c
Remove pydoclint baseline change
ryanaoleary Jul 29, 2025
c8bc79a
Merge branch 'master' into autoscaler-labels
ryanaoleary Jul 29, 2025
a4ee6c4
Merge branch 'master' into autoscaler-labels
ryanaoleary Jul 29, 2025
4741c40
Fix pdyoclint-baseline
ryanaoleary Jul 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 0 additions & 23 deletions ci/lint/pydoclint-baseline.txt
Original file line number Diff line number Diff line change
Expand Up @@ -263,22 +263,6 @@ python/ray/_private/test_utils.py
DOC101: Function `monitor_memory_usage`: Docstring contains fewer arguments than in function signature.
DOC103: Function `monitor_memory_usage`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [print_interval_s: int, record_interval_s: int]. Arguments in the docstring but not in the function signature: [interval_s: ].
--------------------
python/ray/_private/usage/usage_lib.py
DOC201: Function `record_extra_usage_tag` does not have a return section in docstring
DOC201: Function `_generate_cluster_metadata` does not have a return section in docstring
DOC106: Function `put_cluster_metadata`: The option `--arg-type-hints-in-signature` is `True` but there are no argument type hints in the signature
DOC107: Function `put_cluster_metadata`: The option `--arg-type-hints-in-signature` is `True` but not all args in the signature have type hints
DOC101: Function `get_extra_usage_tags_to_report`: Docstring contains fewer arguments than in function signature.
DOC106: Function `get_extra_usage_tags_to_report`: The option `--arg-type-hints-in-signature` is `True` but there are no argument type hints in the signature
DOC107: Function `get_extra_usage_tags_to_report`: The option `--arg-type-hints-in-signature` is `True` but not all args in the signature have type hints
DOC103: Function `get_extra_usage_tags_to_report`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [gcs_client: ].
DOC106: Function `_get_cluster_status_to_report_v2`: The option `--arg-type-hints-in-signature` is `True` but there are no argument type hints in the signature
DOC107: Function `_get_cluster_status_to_report_v2`: The option `--arg-type-hints-in-signature` is `True` but not all args in the signature have type hints
DOC106: Function `get_cluster_status_to_report`: The option `--arg-type-hints-in-signature` is `True` but there are no argument type hints in the signature
DOC107: Function `get_cluster_status_to_report`: The option `--arg-type-hints-in-signature` is `True` but not all args in the signature have type hints
DOC106: Function `get_cluster_metadata`: The option `--arg-type-hints-in-signature` is `True` but there are no argument type hints in the signature
DOC107: Function `get_cluster_metadata`: The option `--arg-type-hints-in-signature` is `True` but not all args in the signature have type hints
--------------------
python/ray/_private/utils.py
DOC101: Function `format_error_message`: Docstring contains fewer arguments than in function signature.
DOC103: Function `format_error_message`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [task_exception: bool].
Expand Down Expand Up @@ -766,8 +750,6 @@ python/ray/autoscaler/v2/utils.py
DOC107: Method `ProtobufUtil.to_dict`: The option `--arg-type-hints-in-signature` is `True` but not all args in the signature have type hints
DOC106: Method `ProtobufUtil.to_dict_list`: The option `--arg-type-hints-in-signature` is `True` but there are no argument type hints in the signature
DOC107: Method `ProtobufUtil.to_dict_list`: The option `--arg-type-hints-in-signature` is `True` but not all args in the signature have type hints
DOC101: Method `ResourceRequestUtil.make`: Docstring contains fewer arguments than in function signature.
DOC103: Method `ResourceRequestUtil.make`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [constraints: Optional[List[Tuple[PlacementConstraintType, str, str]]]].
DOC103: Method `ClusterStatusFormatter._constraint_report`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [cluster_constraint_demand: List[ClusterConstraintDemand]]. Arguments in the docstring but not in the function signature: [data: ].
--------------------
python/ray/client_builder.py
Expand Down Expand Up @@ -1571,11 +1553,6 @@ python/ray/llm/_internal/common/utils/download_utils.py
python/ray/llm/_internal/serve/configs/openai_api_models.py
DOC201: Function `to_model_metadata` does not have a return section in docstring
--------------------
python/ray/llm/_internal/serve/deployments/llm/multiplex/utils.py
DOC201: Function `retry_with_exponential_backoff` does not have a return section in docstring
DOC101: Function `get_object_from_cloud`: Docstring contains fewer arguments than in function signature.
DOC103: Function `get_object_from_cloud`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [object_uri: str].
--------------------
python/ray/llm/_internal/serve/deployments/routers/router.py
DOC101: Method `LLMRouter.completions`: Docstring contains fewer arguments than in function signature.
DOC103: Method `LLMRouter.completions`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [body: CompletionRequest].
Expand Down
16 changes: 16 additions & 0 deletions python/ray/autoscaler/v2/event_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
GangResourceRequest,
ResourceRequest,
)
from ray.core.generated.common_pb2 import LabelSelectorOperator
from ray.core.generated.instance_manager_pb2 import LaunchRequest, TerminationRequest

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -105,6 +106,21 @@ def log_cluster_scheduling_update(
if idx < len(requests_by_count) - 1:
log_str += ", "

# Parse and log label selectors if present
if req_count.request.label_selectors:
selector_strs = []
for selector in req_count.request.label_selectors:
for constraint in selector.label_constraints:
op = LabelSelectorOperator.Name(constraint.operator)
values = ",".join(constraint.label_values)
selector_strs.append(
f"{constraint.label_key} {op} [{values}]"
)
if selector_strs:
log_str += (
" with label selectors: [" + "; ".join(selector_strs) + "]"
)

log_str += (
". Add suitable node types to this cluster to resolve this issue."
)
Expand Down
78 changes: 67 additions & 11 deletions python/ray/autoscaler/v2/scheduler.py
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I think we should also change the _sort_resource_request logic in _try_schedule to add labels to the sorting mechanism, as there can resource requests with exact same resource requirements but different label selectors

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me, done in 8cb1816. I had it sort by the length of the constraints of the first selector, similar to how we sort based on the length of the placement constraints.

Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from ray.autoscaler.v2.instance_manager.config import NodeTypeConfig
from ray.autoscaler.v2.schema import AutoscalerInstance, NodeType
from ray.autoscaler.v2.utils import ProtobufUtil, ResourceRequestUtil
from ray.core.generated.common_pb2 import LabelSelectorOperator
from ray.core.generated.autoscaler_pb2 import (
ClusterResourceConstraint,
GangResourceRequest,
Expand Down Expand Up @@ -275,8 +276,10 @@ def new(
# Available resources for scheduling requests of different
# sources.
available_resources=dict(instance.ray_node.available_resources),
# Use ray node's dynamic labels.
labels=dict(instance.ray_node.dynamic_labels),
labels={
**(instance.ray_node.labels or {}),
**(instance.ray_node.dynamic_labels or {}),
},
status=SchedulingNodeStatus.SCHEDULABLE,
im_instance_id=instance.im_instance.instance_id,
im_instance_status=instance.im_instance.status,
Expand Down Expand Up @@ -437,16 +440,22 @@ def _compute_score(
A "higher" score means that this node is more suitable for scheduling the
current scheduled resource requests.

The score is a tuple of 4 values:
1. Whether this node is a GPU node and the current resource request has
The score is a tuple of 5 values:
1. Whether this node has labels matching the current resource request's
label_selector requirements:
0: if this node does not satisfy any label selector requirements or
no label selectors are provided.
len(label_selectors)-i: a score based on the priority of the label
selector in the resource request that this node satisfies.
2. Whether this node is a GPU node and the current resource request has
GPU requirements:
0: if this node is a GPU node and the current resource request
placed onto the node has no GPU requirements.
1: if this node is not a GPU node or the current resource request
placed onto the node has GPU requirements.
2. The number of resource types being scheduled.
3. The minimum utilization rate across all resource types.
4. The average utilization rate across all resource types.
3. The number of resource types being scheduled.
4. The minimum utilization rate across all resource types.
5. The average utilization rate across all resource types.

NOTE:
This function is adapted from _resource_based_utilization_scorer from
Expand Down Expand Up @@ -499,11 +508,15 @@ def _compute_score(
if is_gpu_node and not any_gpu_requests:
gpu_ok = False

# Check if node satisfies label requirements.
matches_labels = self._satisfies_label_constraints(sched_requests)

# Prioritize avoiding gpu nodes for non-gpu workloads first,
# then prioritize matching multiple resource types,
# then prioritize using all resources,
# then prioritize overall balance of multiple resources.
return (
matches_labels,
gpu_ok,
num_matching_resource_types,
min(util_by_resources) if util_by_resources else 0,
Expand All @@ -512,6 +525,37 @@ def _compute_score(
else 0,
)

def _satisfies_label_constraints(
self, sched_requests: List[ResourceRequest]
) -> int:
"""Returns a higher value based on the priority of the label selector this node
satisfies (first returns highest score, decreasing sequentially for fallback), 0 otherwise."""
for req in sched_requests:
num_selectors = len(req.label_selectors)
for i, selector in enumerate(req.label_selectors):
all_constraints_pass = True
for constraint in selector.label_constraints:
key = constraint.label_key
values = set(constraint.label_values)
op = constraint.operator
node_val = self.labels.get(key)

if op == LabelSelectorOperator.LABEL_OPERATOR_IN:
if node_val not in values:
all_constraints_pass = False
break
elif op == LabelSelectorOperator.LABEL_OPERATOR_NOT_IN:
if node_val in values:
all_constraints_pass = False
break
else:
all_constraints_pass = False
break

if all_constraints_pass:
return num_selectors - i
return 0

def _try_schedule_one(
self, request: ResourceRequest, resource_request_source: ResourceRequestSource
) -> bool:
Expand All @@ -528,6 +572,11 @@ def _try_schedule_one(
True if the resource request is scheduled on this node.
"""

# Enforce label selector constraints
if request.label_selectors:
if self._satisfies_label_constraints([request]) == 0:
return False # Node doesn't satisfy any label selector in request.

# Check if there's placement constraints that are not satisfied.
for constraint in request.placement_constraints:
if constraint.HasField("anti_affinity"):
Expand Down Expand Up @@ -1347,17 +1396,24 @@ def _try_schedule(
def _sort_resource_request(req: ResourceRequest) -> Tuple:
"""
Sort the resource requests by:
1. The length of it's placement constraints.
2. The number of resources it requests.
3. The values of resources it requests.
4. lexicographically for each resource (for stable ordering)
1. The length of its placement constraints.
2. The length of its first label selector constraints (if any).
3. The number of resources it requests.
4. The values of resources it requests.
5. lexicographically for each resource (for stable ordering)

This is a legacy sorting function for the autoscaler's binpacking
algo - we do this so that we could have a deterministic scheduling
results with reasonable fragmentation.
"""
label_constraint_len = (
len(req.label_selectors[0].label_constraints)
if req.label_selectors
else 0
)
return (
len(req.placement_constraints),
label_constraint_len,
len(req.resources_bundle.values()),
sum(req.resources_bundle.values()),
sorted(req.resources_bundle.items()),
Expand Down
2 changes: 2 additions & 0 deletions python/ray/autoscaler/v2/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ class NodeInfo:
details: Optional[str] = None
# Activity on the node.
node_activity: Optional[List[str]] = None
# Ray node labels.
labels: Optional[Dict[str, str]] = None

def total_resources(self) -> Dict[str, float]:
if self.resource_usage is None:
Expand Down
101 changes: 101 additions & 0 deletions python/ray/autoscaler/v2/tests/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,107 @@ def nodes_up():
cluster.shutdown()


@pytest.mark.parametrize("autoscaler_v2", [True])
def test_task_scheduled_on_node_with_label_selector(autoscaler_v2):
cluster = AutoscalingCluster(
head_resources={"CPU": 0},
worker_node_types={
"node1": {
"resources": {"CPU": 1},
"node_config": {},
"labels": {"accelerator-type": "A100", "market-type": "spot"},
"min_workers": 0,
"max_workers": 1,
},
"node2": {
"resources": {"CPU": 1},
"node_config": {},
"labels": {
"region": "us-east1",
"accelerator-type": "TPU",
"market-type": "spot",
},
"min_workers": 0,
"max_workers": 1,
},
"node3": {
"resources": {"CPU": 1},
"node_config": {},
"labels": {"accelerator-type": "B200", "market-type": "spot"},
"min_workers": 0,
"max_workers": 1,
},
"node4": {
"resources": {"CPU": 1},
"node_config": {},
"labels": {"market-type": "on-demand", "accelerator-type": "TPU"},
"min_workers": 0,
"max_workers": 1,
},
},
idle_timeout_minutes=999,
autoscaler_v2=autoscaler_v2,
)

driver_script = """
import ray
import time

@ray.remote(num_cpus=1, label_selector={"accelerator-type": "A100"})
def task1():
print("Running task1")
time.sleep(60)
return True

@ray.remote(num_cpus=1, label_selector={"region": "in(us-east1,me-central1)"})
def task2():
print("Running task2")
time.sleep(60)
return True

@ray.remote(num_cpus=1, label_selector={"accelerator-type": "!in(A100,TPU)"})
def task3():
print("Running task3")
time.sleep(60)
return True

@ray.remote(num_cpus=1, label_selector={"market-type": "!in(spot)"})
def task4():
print("Running task4")
time.sleep(60)
return True

ray.init("auto")
assert(ray.get([task1.remote(), task2.remote(), task3.remote(), task4.remote()]))
"""

try:
cluster.start()
ray.init("auto")
gcs_address = ray.get_runtime_context().gcs_address
expected_nodes = 4

def tasks_run():
tasks = list_tasks()
assert len(tasks) > 0
return True

run_string_as_driver_nonblocking(driver_script)
wait_for_condition(tasks_run)

def all_tasks_scheduled():
status = get_cluster_status(gcs_address)
return len(status.active_nodes) == expected_nodes

# All tasks with label selectors should be scheduled, scaling
# 4 nodes with the required labels.
wait_for_condition(all_tasks_scheduled, timeout=60)

finally:
ray.shutdown()
cluster.shutdown()


if __name__ == "__main__":
if os.environ.get("PARALLEL_CI"):
sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__]))
Expand Down
Loading