Skip to content

Commit ed49a53

Browse files
[Core] Add fallback strategy scheduling logic (#56369)
This PR also updates the cluster resource scheduler logic to account for the list of `LabelSelector`s specified by the `fallback_strategy`, falling back to each fallback strategy `LabelSelector` in-order until one is satisfied when selecting the best node. We're able to support fallback selectors by considering them in the cluster resource scheduler in-order using the existing label selector logic in `IsFeasible` and `IsAvailable`, returning the first valid node returned by `GetBestSchedulableNode`. #51564 --------- Signed-off-by: Ryan O'Leary <[email protected]> Signed-off-by: Ryan O'Leary <[email protected]> Co-authored-by: Mengjin Yan <[email protected]>
1 parent fef5fa6 commit ed49a53

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+1364
-130
lines changed

python/ray/_raylet.pyx

Lines changed: 52 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -87,9 +87,11 @@ from ray.includes.common cimport (
8787
CRayStatus,
8888
CActorTableData,
8989
CErrorTableData,
90+
CFallbackOption,
9091
CGcsClientOptions,
9192
CGcsNodeInfo,
9293
CJobTableData,
94+
CLabelSelector,
9395
CLogBatch,
9496
CTaskArg,
9597
CTaskArgByReference,
@@ -835,12 +837,13 @@ cdef int prepare_labels(
835837

836838
cdef int prepare_label_selector(
837839
dict label_selector_dict,
838-
unordered_map[c_string, c_string] *label_selector) except -1:
840+
CLabelSelector *c_label_selector) except -1:
841+
842+
c_label_selector[0] = CLabelSelector()
839843

840844
if label_selector_dict is None:
841845
return 0
842846

843-
label_selector[0].reserve(len(label_selector_dict))
844847
for key, value in label_selector_dict.items():
845848
if not isinstance(key, str):
846849
raise ValueError(f"Label selector key type must be string, but got {type(key)}")
@@ -853,10 +856,39 @@ cdef int prepare_label_selector(
853856
inner = value[value.index("(")+1:-1].strip()
854857
if not inner:
855858
raise ValueError(f"No values provided for Label Selector '{value[:value.index('(')]}' operator on key '{key}'.")
856-
label_selector[0][key.encode("utf-8")] = value.encode("utf-8")
859+
# Add key-value constraint to the LabelSelector object.
860+
c_label_selector[0].AddConstraint(key.encode("utf-8"), value.encode("utf-8"))
857861

858862
return 0
859863

864+
cdef int prepare_fallback_strategy(
865+
list fallback_strategy,
866+
c_vector[CFallbackOption] *fallback_strategy_vector) except -1:
867+
868+
cdef dict label_selector_dict
869+
cdef CLabelSelector c_label_selector
870+
871+
if fallback_strategy is None:
872+
return 0
873+
874+
for strategy_dict in fallback_strategy:
875+
if not isinstance(strategy_dict, dict):
876+
raise ValueError(
877+
"Fallback strategy must be a list of dicts, "
878+
f"but got list containing {type(strategy_dict)}")
879+
880+
label_selector_dict = strategy_dict.get("label_selector")
881+
882+
if label_selector_dict is not None and not isinstance(label_selector_dict, dict):
883+
raise ValueError("Invalid fallback strategy element: invalid 'label_selector'.")
884+
885+
prepare_label_selector(label_selector_dict, &c_label_selector)
886+
887+
fallback_strategy_vector.push_back(
888+
CFallbackOption(c_label_selector)
889+
)
890+
891+
return 0
860892

861893
cdef int prepare_resources(
862894
dict resource_dict,
@@ -3644,11 +3676,13 @@ cdef class CoreWorker:
36443676
int64_t generator_backpressure_num_objects,
36453677
c_bool enable_task_events,
36463678
labels,
3647-
label_selector):
3679+
label_selector,
3680+
fallback_strategy):
36483681
cdef:
36493682
unordered_map[c_string, double] c_resources
36503683
unordered_map[c_string, c_string] c_labels
3651-
unordered_map[c_string, c_string] c_label_selector
3684+
CLabelSelector c_label_selector
3685+
c_vector[CFallbackOption] c_fallback_strategy
36523686
CRayFunction ray_function
36533687
CTaskOptions task_options
36543688
c_vector[unique_ptr[CTaskArg]] args_vector
@@ -3675,6 +3709,7 @@ cdef class CoreWorker:
36753709
prepare_resources(resources, &c_resources)
36763710
prepare_labels(labels, &c_labels)
36773711
prepare_label_selector(label_selector, &c_label_selector)
3712+
prepare_fallback_strategy(fallback_strategy, &c_fallback_strategy)
36783713
ray_function = CRayFunction(
36793714
language.lang, function_descriptor.descriptor)
36803715
prepare_args_and_increment_put_refs(
@@ -3691,7 +3726,8 @@ cdef class CoreWorker:
36913726
c_label_selector,
36923727
# `tensor_transport` is currently only supported in Ray Actor tasks.
36933728
# For Ray tasks, we always use `OBJECT_STORE`.
3694-
TENSOR_TRANSPORT_OBJECT_STORE)
3729+
TENSOR_TRANSPORT_OBJECT_STORE,
3730+
c_fallback_strategy)
36953731

36963732
current_c_task_id = current_task.native()
36973733

@@ -3742,6 +3778,7 @@ cdef class CoreWorker:
37423778
label_selector,
37433779
c_bool allow_out_of_order_execution,
37443780
c_bool enable_tensor_transport,
3781+
fallback_strategy,
37453782
):
37463783
cdef:
37473784
CRayFunction ray_function
@@ -3755,7 +3792,8 @@ cdef class CoreWorker:
37553792
c_vector[CObjectID] incremented_put_arg_ids
37563793
optional[c_bool] is_detached_optional = nullopt
37573794
unordered_map[c_string, c_string] c_labels
3758-
unordered_map[c_string, c_string] c_label_selector
3795+
CLabelSelector c_label_selector
3796+
c_vector[CFallbackOption] c_fallback_strategy
37593797
c_string call_site
37603798

37613799
self.python_scheduling_strategy_to_c(
@@ -3770,6 +3808,7 @@ cdef class CoreWorker:
37703808
prepare_resources(placement_resources, &c_placement_resources)
37713809
prepare_labels(labels, &c_labels)
37723810
prepare_label_selector(label_selector, &c_label_selector)
3811+
prepare_fallback_strategy(fallback_strategy, &c_fallback_strategy)
37733812
ray_function = CRayFunction(
37743813
language.lang, function_descriptor.descriptor)
37753814
prepare_args_and_increment_put_refs(
@@ -3799,7 +3838,8 @@ cdef class CoreWorker:
37993838
enable_tensor_transport,
38003839
enable_task_events,
38013840
c_labels,
3802-
c_label_selector),
3841+
c_label_selector,
3842+
c_fallback_strategy),
38033843
extension_data,
38043844
call_site,
38053845
&c_actor_id,
@@ -3914,9 +3954,10 @@ cdef class CoreWorker:
39143954
c_string serialized_retry_exception_allowlist
39153955
c_string serialized_runtime_env = b"{}"
39163956
unordered_map[c_string, c_string] c_labels
3917-
unordered_map[c_string, c_string] c_label_selector
3957+
CLabelSelector c_label_selector
39183958
c_string call_site
39193959
CTensorTransport c_tensor_transport_val
3960+
c_vector[CFallbackOption] c_fallback_strategy
39203961

39213962
serialized_retry_exception_allowlist = serialize_retry_exception_allowlist(
39223963
retry_exception_allowlist,
@@ -3953,7 +3994,8 @@ cdef class CoreWorker:
39533994
enable_task_events,
39543995
c_labels,
39553996
c_label_selector,
3956-
c_tensor_transport_val),
3997+
c_tensor_transport_val,
3998+
c_fallback_strategy),
39573999
max_retries,
39584000
retry_exceptions,
39594001
serialized_retry_exception_allowlist,

python/ray/actor.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1807,6 +1807,7 @@ def _remote(self, args=None, kwargs=None, **actor_options) -> ActorProxy[T]:
18071807
enable_task_events=enable_task_events,
18081808
labels=actor_options.get("_labels"),
18091809
label_selector=actor_options.get("label_selector"),
1810+
fallback_strategy=actor_options.get("fallback_strategy"),
18101811
allow_out_of_order_execution=allow_out_of_order_execution,
18111812
enable_tensor_transport=meta.enable_tensor_transport,
18121813
)

python/ray/autoscaler/v2/tests/test_e2e.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -654,7 +654,7 @@ def all_nodes_launched():
654654
nodes = {node["NodeID"]: node["Labels"] for node in ray.nodes()}
655655
task_selectors = {
656656
"task_0": {"accelerator-type": "A100"},
657-
"task_1": {"region": "in(us-east1,me-central1)"},
657+
"task_1": {"region": "in(me-central1,us-east1)"},
658658
"task_2": {"accelerator-type": "!in(A100,TPU)"},
659659
"task_3": {"market-type": "!spot"},
660660
}
@@ -799,7 +799,7 @@ def all_actors_scheduled():
799799
nodes = {node["NodeID"]: node["Labels"] for node in ray.nodes()}
800800
actor_selectors = {
801801
"actor_0": {"accelerator-type": "A100"},
802-
"actor_1": {"region": "in(us-east1,me-central1)"},
802+
"actor_1": {"region": "in(me-central1,us-east1)"},
803803
"actor_2": {"accelerator-type": "!in(A100,TPU)"},
804804
"actor_3": {"market-type": "!spot"},
805805
}
@@ -900,9 +900,22 @@ def test_pg_scheduled_on_node_with_bundle_label_selector(autoscaler_v2):
900900
ray.get(pg.ready())
901901

902902
# Validate the number and types of the auto-scaled nodes are as expected.
903-
status = get_cluster_status(gcs_address)
904-
assert len(status.active_nodes) == expected_nodes
903+
# Add a wait here to avoid flaky test behavior.
904+
def check_nodes_active():
905+
status = get_cluster_status(gcs_address)
906+
return len(status.active_nodes) == expected_nodes
905907

908+
try:
909+
wait_for_condition(check_nodes_active, timeout=30, retry_interval_ms=500)
910+
except Exception as e:
911+
latest_status = get_cluster_status(gcs_address)
912+
raise AssertionError(
913+
f"Timed out waiting for {expected_nodes} active nodes. "
914+
f"Got: {len(latest_status.active_nodes)}. "
915+
f"Full status: {latest_status}"
916+
) from e
917+
918+
status = get_cluster_status(gcs_address)
906919
actual_node_types = {node.ray_node_type_name for node in status.active_nodes}
907920
expected_node_types = {"a100_node", "tpu_node"}
908921
assert actual_node_types == expected_node_types

python/ray/includes/common.pxd

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,17 @@ cdef extern from "src/ray/protobuf/common.pb.h" nogil:
239239
CLineageReconstructionTask()
240240
const c_string &SerializeAsString() const
241241

242+
cdef extern from "ray/common/scheduling/label_selector.h" namespace "ray":
243+
cdef cppclass CLabelSelector "ray::LabelSelector":
244+
CLabelSelector() nogil except +
245+
void AddConstraint(const c_string& key, const c_string& value) nogil except +
246+
247+
cdef extern from "ray/common/scheduling/fallback_strategy.h" namespace "ray":
248+
cdef cppclass CFallbackOption "ray::FallbackOption":
249+
CLabelSelector label_selector
250+
251+
CFallbackOption() nogil except +
252+
CFallbackOption(CLabelSelector) nogil except +
242253

243254
# This is a workaround for C++ enum class since Cython has no corresponding
244255
# representation.
@@ -346,8 +357,9 @@ cdef extern from "ray/core_worker/common.h" nogil:
346357
c_string serialized_runtime_env,
347358
c_bool enable_task_events,
348359
const unordered_map[c_string, c_string] &labels,
349-
const unordered_map[c_string, c_string] &label_selector,
350-
CTensorTransport tensor_transport)
360+
CLabelSelector label_selector,
361+
CTensorTransport tensor_transport,
362+
c_vector[CFallbackOption] fallback_strategy)
351363

352364
cdef cppclass CActorCreationOptions "ray::core::ActorCreationOptions":
353365
CActorCreationOptions()
@@ -368,7 +380,8 @@ cdef extern from "ray/core_worker/common.h" nogil:
368380
c_bool enable_tensor_transport,
369381
c_bool enable_task_events,
370382
const unordered_map[c_string, c_string] &labels,
371-
const unordered_map[c_string, c_string] &label_selector)
383+
CLabelSelector label_selector,
384+
c_vector[CFallbackOption] fallback_strategy)
372385

373386
cdef cppclass CPlacementGroupCreationOptions \
374387
"ray::core::PlacementGroupCreationOptions":

python/ray/remote_function.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ class RemoteFunction:
5858
_memory: The heap memory request in bytes for this task/actor,
5959
rounded down to the nearest integer.
6060
_label_selector: The label requirements on a node for scheduling of the task or actor.
61+
_fallback_strategy: Soft constraints of a list of decorator options to fall back on when scheduling on a node.
6162
_resources: The default custom resource requirements for invocations of
6263
this remote function.
6364
_num_returns: The default number of return values for invocations
@@ -471,6 +472,7 @@ def _remote(
471472
enable_task_events = task_options.get("enable_task_events")
472473
labels = task_options.get("_labels")
473474
label_selector = task_options.get("label_selector")
475+
fallback_strategy = task_options.get("fallback_strategy")
474476

475477
def invocation(args, kwargs):
476478
if self._is_cross_language:
@@ -503,6 +505,7 @@ def invocation(args, kwargs):
503505
enable_task_events,
504506
labels,
505507
label_selector,
508+
fallback_strategy,
506509
)
507510
# Reset worker's debug context from the last "remote" command
508511
# (which applies only to this .remote call).

python/ray/tests/test_label_scheduling.py

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,5 +96,92 @@ def test_label_selector_multiple(cluster_with_labeled_nodes):
9696
assert ray.get(actor.get_node_id.remote(), timeout=3) == node_3
9797

9898

99+
def test_fallback_strategy(cluster_with_labeled_nodes):
100+
# Create a RayCluster with labelled nodes.
101+
gpu_node, _, _ = cluster_with_labeled_nodes
102+
103+
# Define an unsatisfiable label selector.
104+
infeasible_label_selector = {"ray.io/accelerator-type": "does-not-exist"}
105+
106+
# Create a fallback strategy with multiple accelerator options.
107+
accelerator_fallbacks = [
108+
{"label_selector": {"ray.io/accelerator-type": "A100"}},
109+
{"label_selector": {"ray.io/accelerator-type": "TPU"}},
110+
]
111+
112+
# Attempt to schedule the actor. The scheduler should fail to find a node with the
113+
# primary `label_selector` and fall back to the first available option, 'A100'.
114+
label_selector_actor = MyActor.options(
115+
label_selector=infeasible_label_selector,
116+
fallback_strategy=accelerator_fallbacks,
117+
).remote()
118+
119+
# Assert that the actor was scheduled on the expected node.
120+
assert ray.get(label_selector_actor.get_node_id.remote(), timeout=5) == gpu_node
121+
122+
123+
def test_empty_selector_fallback_strategy(cluster_with_labeled_nodes):
124+
node_1, node_2, node_3 = cluster_with_labeled_nodes
125+
126+
# Define an unsatisfiable label selector.
127+
infeasible_label_selector = {"ray.io/accelerator-type": "does-not-exist"}
128+
129+
# Create a fallback strategy with multiple label selector fallbacks. The
130+
# first fallback option is unsatisfiable, so it falls back to the empty label
131+
# selector option. This fallback should match any node.
132+
accelerator_fallbacks = [
133+
{"label_selector": {"ray.io/accelerator-type": "also-does-not-exist"}},
134+
{"label_selector": {}},
135+
]
136+
137+
label_selector_actor = MyActor.options(
138+
label_selector=infeasible_label_selector,
139+
fallback_strategy=accelerator_fallbacks,
140+
).remote()
141+
142+
# Assert that the actor was scheduled on the expected node.
143+
assert ray.get(label_selector_actor.get_node_id.remote(), timeout=5) in {
144+
node_1,
145+
node_2,
146+
node_3,
147+
}
148+
149+
150+
def test_infeasible_fallback_strategy(cluster_with_labeled_nodes):
151+
# Define an unsatisfiable label selector and fallback strategy.
152+
label_selector = {"ray.io/accelerator-type": "does-not-exist"}
153+
fallback_strategy = [
154+
{"label_selector": {"ray.io/accelerator-type": "does-not-exist-either"}},
155+
{"label_selector": {"ray.io/accelerator-type": "also-nonexistant"}},
156+
]
157+
158+
# Attempt to schedule the actor, but it should timeout since none of
159+
# the nodes match any label selector.
160+
label_selector_actor = MyActor.options(
161+
label_selector=label_selector, fallback_strategy=fallback_strategy
162+
).remote()
163+
with pytest.raises(TimeoutError):
164+
ray.get(label_selector_actor.get_node_id.remote(), timeout=3)
165+
166+
167+
def test_fallback_with_feasible_primary_selector(cluster_with_labeled_nodes):
168+
gpu_node, _, _ = cluster_with_labeled_nodes
169+
170+
feasible_label_selector = {"ray.io/accelerator-type": "A100"}
171+
feasible_fallback_strategy = [
172+
{"label_selector": {"ray.io/accelerator-type": "B200"}},
173+
]
174+
175+
# Attempt to schedule the actor. The scheduler should use the
176+
# primary selector and ignore the fallback.
177+
label_selector_actor = MyActor.options(
178+
label_selector=feasible_label_selector,
179+
fallback_strategy=feasible_fallback_strategy,
180+
).remote()
181+
182+
# Assert that the actor was scheduled on the expected node and not the fallback.
183+
assert ray.get(label_selector_actor.get_node_id.remote(), timeout=5) == gpu_node
184+
185+
99186
if __name__ == "__main__":
100187
sys.exit(pytest.main(["-sv", __file__]))

src/ray/common/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,7 @@ ray_cc_library(
230230
":ray_config",
231231
":ray_object",
232232
":runtime_env",
233+
"//src/ray/common/scheduling:fallback_strategy",
233234
"//src/ray/common/scheduling:label_selector",
234235
"//src/ray/common/scheduling:resource_set",
235236
"//src/ray/common/scheduling:scheduling_class_util",

0 commit comments

Comments
 (0)