diff --git a/src/ray/common/scheduling/BUILD.bazel b/src/ray/common/scheduling/BUILD.bazel index baac27e1096f..b695add72481 100644 --- a/src/ray/common/scheduling/BUILD.bazel +++ b/src/ray/common/scheduling/BUILD.bazel @@ -21,6 +21,7 @@ ray_cc_library( srcs = ["label_selector.cc"], hdrs = ["label_selector.h"], deps = [ + "//src/ray/common:constants", "//src/ray/protobuf:common_cc_proto", "@com_google_absl//absl/container:flat_hash_set", "@com_google_absl//absl/strings", diff --git a/src/ray/common/scheduling/label_selector.h b/src/ray/common/scheduling/label_selector.h index 884d64325975..e0fe689b8436 100644 --- a/src/ray/common/scheduling/label_selector.h +++ b/src/ray/common/scheduling/label_selector.h @@ -14,12 +14,14 @@ #pragma once +#include #include #include #include #include "absl/container/flat_hash_set.h" #include "google/protobuf/map.h" +#include "ray/common/constants.h" #include "src/ray/protobuf/common.pb.h" namespace ray { @@ -104,4 +106,18 @@ H AbslHashValue(H h, const LabelSelector &label_selector) { return h; } +inline std::optional> GetHardNodeAffinityValues( + const LabelSelector &label_selector) { + const std::string hard_affinity_key(kLabelKeyNodeID); + + for (const auto &constraint : label_selector.GetConstraints()) { + if (constraint.GetLabelKey() == hard_affinity_key) { + if (constraint.GetOperator() == LabelSelectorOperator::LABEL_IN) { + return constraint.GetLabelValues(); + } + } + } + return std::nullopt; +} + } // namespace ray diff --git a/src/ray/core_worker/lease_policy.cc b/src/ray/core_worker/lease_policy.cc index f1efd15e951c..f64ea1c1e155 100644 --- a/src/ray/core_worker/lease_policy.cc +++ b/src/ray/core_worker/lease_policy.cc @@ -30,6 +30,17 @@ std::pair LocalityAwareLeasePolicy::GetBestNodeForLease( return std::make_pair(fallback_rpc_address_, false); } + // Node Affinity specified through label selectors has higher + // priority than locality aware scheduling. + if (auto node_id_values = GetHardNodeAffinityValues(spec.GetLabelSelector())) { + for (const auto &node_id_hex : *node_id_values) { + if (auto addr = node_addr_factory_(NodeID::FromHex(node_id_hex))) { + return std::make_pair(addr.value(), false); + } + } + return std::make_pair(fallback_rpc_address_, false); + } + if (spec.IsNodeAffinitySchedulingStrategy()) { // The explicit node affinity scheduling strategy // has higher priority than locality aware scheduling. diff --git a/src/ray/raylet/scheduling/cluster_lease_manager.cc b/src/ray/raylet/scheduling/cluster_lease_manager.cc index 61893fc29de8..c94c03440618 100644 --- a/src/ray/raylet/scheduling/cluster_lease_manager.cc +++ b/src/ray/raylet/scheduling/cluster_lease_manager.cc @@ -230,8 +230,11 @@ void ClusterLeaseManager::ScheduleAndGrantLeases() { << lease.GetLeaseSpecification().LeaseId() << " is infeasible?" << is_infeasible; - if (lease.GetLeaseSpecification().IsNodeAffinitySchedulingStrategy() && - !lease.GetLeaseSpecification().GetNodeAffinitySchedulingStrategySoft()) { + auto affinity_values = + GetHardNodeAffinityValues(lease.GetLeaseSpecification().GetLabelSelector()); + if ((lease.GetLeaseSpecification().IsNodeAffinitySchedulingStrategy() && + !lease.GetLeaseSpecification().GetNodeAffinitySchedulingStrategySoft()) || + (affinity_values.has_value() && !affinity_values->empty())) { // This can only happen if the target node doesn't exist or is infeasible. // The lease will never be schedulable in either case so we should fail it. if (cluster_resource_scheduler_.IsLocalNodeWithRaylet()) { @@ -318,7 +321,7 @@ void ClusterLeaseManager::TryScheduleInfeasibleLease() { /*requires_object_store_memory*/ false, &is_infeasible); - // There is no node that has available resources to run the request. + // There is no node that has feasible resources to run the request. // Move on to the next shape. if (is_infeasible) { RAY_LOG(DEBUG) << "No feasible node found for lease " diff --git a/src/ray/raylet/scheduling/cluster_resource_manager.h b/src/ray/raylet/scheduling/cluster_resource_manager.h index 58bde9688105..6f69f67c0e2c 100644 --- a/src/ray/raylet/scheduling/cluster_resource_manager.h +++ b/src/ray/raylet/scheduling/cluster_resource_manager.h @@ -203,6 +203,7 @@ class ClusterResourceManager { FRIEND_TEST(ClusterResourceSchedulerTest, TestForceSpillback); FRIEND_TEST(ClusterResourceSchedulerTest, AffinityWithBundleScheduleTest); FRIEND_TEST(ClusterResourceSchedulerTest, LabelSelectorIsSchedulableOnNodeTest); + FRIEND_TEST(ClusterResourceSchedulerTest, LabelSelectorHardNodeAffinityTest); friend class raylet::SchedulingPolicyTest; friend class raylet_scheduling_policy::HybridSchedulingPolicyTest; diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler.h b/src/ray/raylet/scheduling/cluster_resource_scheduler.h index 2df66334975f..c60bd2d86aa8 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler.h +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler.h @@ -248,6 +248,7 @@ class ClusterResourceScheduler { FRIEND_TEST(ClusterResourceSchedulerTest, TestForceSpillback); FRIEND_TEST(ClusterResourceSchedulerTest, AffinityWithBundleScheduleTest); FRIEND_TEST(ClusterResourceSchedulerTest, LabelSelectorIsSchedulableOnNodeTest); + FRIEND_TEST(ClusterResourceSchedulerTest, LabelSelectorHardNodeAffinityTest); }; } // end namespace ray diff --git a/src/ray/raylet/scheduling/tests/cluster_resource_scheduler_test.cc b/src/ray/raylet/scheduling/tests/cluster_resource_scheduler_test.cc index 9614fa22a737..52682e2acd48 100644 --- a/src/ray/raylet/scheduling/tests/cluster_resource_scheduler_test.cc +++ b/src/ray/raylet/scheduling/tests/cluster_resource_scheduler_test.cc @@ -1867,6 +1867,104 @@ TEST_F(ClusterResourceSchedulerTest, LabelSelectorIsSchedulableOnNodeTest) { ASSERT_FALSE(is_infeasible); } +TEST_F(ClusterResourceSchedulerTest, LabelSelectorHardNodeAffinityTest) { + // Setup scheduler with two nodes. + absl::flat_hash_map node_resources_map({{ResourceID::CPU(), 1}}); + NodeResources node_resources = CreateNodeResources(node_resources_map); + auto local_node_id = scheduling::NodeID(NodeID::FromRandom().Binary()); + instrumented_io_context io_context; + ClusterResourceScheduler resource_scheduler( + io_context, local_node_id, {{"CPU", 0}}, is_node_available_fn_); + + auto node_0_id_obj = NodeID::FromRandom(); + auto node_1_id_obj = NodeID::FromRandom(); + auto node_0 = scheduling::NodeID(node_0_id_obj.Binary()); + auto node_1 = scheduling::NodeID(node_1_id_obj.Binary()); + resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(node_0, node_resources); + resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(node_1, node_resources); + + // Set required node labels. + absl::flat_hash_map node_0_labels = { + {"ray.io/node-id", node_0_id_obj.Hex()}, + }; + absl::flat_hash_map node_1_labels = { + {"ray.io/node-id", node_1_id_obj.Hex()}, + }; + resource_scheduler.GetClusterResourceManager().SetNodeLabels(node_0, node_0_labels); + resource_scheduler.GetClusterResourceManager().SetNodeLabels(node_1, node_1_labels); + + ResourceRequest base_resource_request = CreateResourceRequest({{ResourceID::CPU(), 1}}); + int64_t violations; + bool is_infeasible; + rpc::SchedulingStrategy scheduling_strategy; + scheduling_strategy.mutable_default_scheduling_strategy(); + + // Schedule on a single specified node. + { + LabelSelector selector; + selector.AddConstraint(LabelConstraint( + "ray.io/node-id", LabelSelectorOperator::LABEL_IN, {node_0_id_obj.Hex()})); + ResourceRequest request = base_resource_request; + request.SetLabelSelector(selector); + + auto result_node_id = resource_scheduler.GetBestSchedulableNode(request, + scheduling_strategy, + false, + false, + std::string(), + &violations, + &is_infeasible); + ASSERT_EQ(result_node_id, node_0); + ASSERT_FALSE(is_infeasible); + } + + // Schedule on one of two specified nodes (in() operator). + { + LabelSelector selector; + selector.AddConstraint(LabelConstraint("ray.io/node-id", + LabelSelectorOperator::LABEL_IN, + {node_0_id_obj.Hex(), node_1_id_obj.Hex()})); + ResourceRequest request = base_resource_request; + request.SetLabelSelector(selector); + + auto result_node_id = resource_scheduler.GetBestSchedulableNode(request, + scheduling_strategy, + false, + false, + std::string(), + &violations, + &is_infeasible); + ASSERT_TRUE(result_node_id == node_0 || result_node_id == node_1); + ASSERT_FALSE(is_infeasible); + } + + // Scheduling is infeasible when all specified nodes are infeasible.. + { + NodeResources depleted_node_resources = CreateNodeResources({{ResourceID::CPU(), 0}}); + resource_scheduler.GetClusterResourceManager().AddOrUpdateNode( + node_0, depleted_node_resources); + resource_scheduler.GetClusterResourceManager().AddOrUpdateNode( + node_1, depleted_node_resources); + + LabelSelector selector; + selector.AddConstraint(LabelConstraint("ray.io/node-id", + LabelSelectorOperator::LABEL_IN, + {node_0_id_obj.Hex(), node_1_id_obj.Hex()})); + ResourceRequest request = base_resource_request; + request.SetLabelSelector(selector); + + auto result_node_id = resource_scheduler.GetBestSchedulableNode(request, + scheduling_strategy, + false, + false, + std::string(), + &violations, + &is_infeasible); + ASSERT_TRUE(result_node_id.IsNil()); + ASSERT_TRUE(is_infeasible); + } +} + } // namespace ray int main(int argc, char **argv) {