Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/ray/common/scheduling/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ ray_cc_library(
srcs = ["label_selector.cc"],
hdrs = ["label_selector.h"],
deps = [
"//src/ray/common:constants",
"//src/ray/protobuf:common_cc_proto",
"@com_google_absl//absl/container:flat_hash_set",
"@com_google_absl//absl/strings",
Expand Down
16 changes: 16 additions & 0 deletions src/ray/common/scheduling/label_selector.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@

#pragma once

#include <optional>
#include <string>
#include <utility>
#include <vector>

#include "absl/container/flat_hash_set.h"
#include "google/protobuf/map.h"
#include "ray/common/constants.h"
#include "src/ray/protobuf/common.pb.h"

namespace ray {
Expand Down Expand Up @@ -104,4 +106,18 @@ H AbslHashValue(H h, const LabelSelector &label_selector) {
return h;
}

inline std::optional<absl::flat_hash_set<std::string>> GetHardNodeAffinityValues(
const LabelSelector &label_selector) {
const std::string hard_affinity_key(kLabelKeyNodeID);

for (const auto &constraint : label_selector.GetConstraints()) {
if (constraint.GetLabelKey() == hard_affinity_key) {
if (constraint.GetOperator() == LabelSelectorOperator::LABEL_IN) {
return constraint.GetLabelValues();
}
}
}
return std::nullopt;
}

} // namespace ray
11 changes: 11 additions & 0 deletions src/ray/core_worker/lease_policy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,17 @@ std::pair<rpc::Address, bool> LocalityAwareLeasePolicy::GetBestNodeForLease(
return std::make_pair(fallback_rpc_address_, false);
}

// Node Affinity specified through label selectors has higher
// priority than locality aware scheduling.
if (auto node_id_values = GetHardNodeAffinityValues(spec.GetLabelSelector())) {
for (const auto &node_id_hex : *node_id_values) {
if (auto addr = node_addr_factory_(NodeID::FromHex(node_id_hex))) {
return std::make_pair(addr.value(), false);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something we can do smarter in the follow-up is that if we have a list of nodes here, we should pick the node with the most arguments.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I'm wondering if the logic should be in here where the core_worker finds the raylet to send the requests to or it should be in the the raylet logic where it finds the best node to send the task?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a good question. Currently it has to be here since raylet doesn't have location information of objects (owner does).

}
}
return std::make_pair(fallback_rpc_address_, false);
}

if (spec.IsNodeAffinitySchedulingStrategy()) {
// The explicit node affinity scheduling strategy
// has higher priority than locality aware scheduling.
Expand Down
9 changes: 6 additions & 3 deletions src/ray/raylet/scheduling/cluster_lease_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,11 @@ void ClusterLeaseManager::ScheduleAndGrantLeases() {
<< lease.GetLeaseSpecification().LeaseId() << " is infeasible?"
<< is_infeasible;

if (lease.GetLeaseSpecification().IsNodeAffinitySchedulingStrategy() &&
!lease.GetLeaseSpecification().GetNodeAffinitySchedulingStrategySoft()) {
auto affinity_values =
GetHardNodeAffinityValues(lease.GetLeaseSpecification().GetLabelSelector());
if ((lease.GetLeaseSpecification().IsNodeAffinitySchedulingStrategy() &&
!lease.GetLeaseSpecification().GetNodeAffinitySchedulingStrategySoft()) ||
(affinity_values.has_value() && !affinity_values->empty())) {
// This can only happen if the target node doesn't exist or is infeasible.
// The lease will never be schedulable in either case so we should fail it.
if (cluster_resource_scheduler_.IsLocalNodeWithRaylet()) {
Expand Down Expand Up @@ -318,7 +321,7 @@ void ClusterLeaseManager::TryScheduleInfeasibleLease() {
/*requires_object_store_memory*/ false,
&is_infeasible);

// There is no node that has available resources to run the request.
// There is no node that has feasible resources to run the request.
// Move on to the next shape.
if (is_infeasible) {
RAY_LOG(DEBUG) << "No feasible node found for lease "
Expand Down
1 change: 1 addition & 0 deletions src/ray/raylet/scheduling/cluster_resource_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ class ClusterResourceManager {
FRIEND_TEST(ClusterResourceSchedulerTest, TestForceSpillback);
FRIEND_TEST(ClusterResourceSchedulerTest, AffinityWithBundleScheduleTest);
FRIEND_TEST(ClusterResourceSchedulerTest, LabelSelectorIsSchedulableOnNodeTest);
FRIEND_TEST(ClusterResourceSchedulerTest, LabelSelectorHardNodeAffinityTest);

friend class raylet::SchedulingPolicyTest;
friend class raylet_scheduling_policy::HybridSchedulingPolicyTest;
Expand Down
1 change: 1 addition & 0 deletions src/ray/raylet/scheduling/cluster_resource_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ class ClusterResourceScheduler {
FRIEND_TEST(ClusterResourceSchedulerTest, TestForceSpillback);
FRIEND_TEST(ClusterResourceSchedulerTest, AffinityWithBundleScheduleTest);
FRIEND_TEST(ClusterResourceSchedulerTest, LabelSelectorIsSchedulableOnNodeTest);
FRIEND_TEST(ClusterResourceSchedulerTest, LabelSelectorHardNodeAffinityTest);
};

} // end namespace ray
Original file line number Diff line number Diff line change
Expand Up @@ -1867,6 +1867,104 @@ TEST_F(ClusterResourceSchedulerTest, LabelSelectorIsSchedulableOnNodeTest) {
ASSERT_FALSE(is_infeasible);
}

TEST_F(ClusterResourceSchedulerTest, LabelSelectorHardNodeAffinityTest) {
// Setup scheduler with two nodes.
absl::flat_hash_map<ResourceID, double> node_resources_map({{ResourceID::CPU(), 1}});
NodeResources node_resources = CreateNodeResources(node_resources_map);
auto local_node_id = scheduling::NodeID(NodeID::FromRandom().Binary());
instrumented_io_context io_context;
ClusterResourceScheduler resource_scheduler(
io_context, local_node_id, {{"CPU", 0}}, is_node_available_fn_);

auto node_0_id_obj = NodeID::FromRandom();
auto node_1_id_obj = NodeID::FromRandom();
auto node_0 = scheduling::NodeID(node_0_id_obj.Binary());
auto node_1 = scheduling::NodeID(node_1_id_obj.Binary());
resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(node_0, node_resources);
resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(node_1, node_resources);

// Set required node labels.
absl::flat_hash_map<std::string, std::string> node_0_labels = {
{"ray.io/node-id", node_0_id_obj.Hex()},
};
absl::flat_hash_map<std::string, std::string> node_1_labels = {
{"ray.io/node-id", node_1_id_obj.Hex()},
};
resource_scheduler.GetClusterResourceManager().SetNodeLabels(node_0, node_0_labels);
resource_scheduler.GetClusterResourceManager().SetNodeLabels(node_1, node_1_labels);

ResourceRequest base_resource_request = CreateResourceRequest({{ResourceID::CPU(), 1}});
int64_t violations;
bool is_infeasible;
rpc::SchedulingStrategy scheduling_strategy;
scheduling_strategy.mutable_default_scheduling_strategy();

// Schedule on a single specified node.
{
LabelSelector selector;
selector.AddConstraint(LabelConstraint(
"ray.io/node-id", LabelSelectorOperator::LABEL_IN, {node_0_id_obj.Hex()}));
ResourceRequest request = base_resource_request;
request.SetLabelSelector(selector);

auto result_node_id = resource_scheduler.GetBestSchedulableNode(request,
scheduling_strategy,
false,
false,
std::string(),
&violations,
&is_infeasible);
ASSERT_EQ(result_node_id, node_0);
ASSERT_FALSE(is_infeasible);
}

// Schedule on one of two specified nodes (in() operator).
{
LabelSelector selector;
selector.AddConstraint(LabelConstraint("ray.io/node-id",
LabelSelectorOperator::LABEL_IN,
{node_0_id_obj.Hex(), node_1_id_obj.Hex()}));
ResourceRequest request = base_resource_request;
request.SetLabelSelector(selector);

auto result_node_id = resource_scheduler.GetBestSchedulableNode(request,
scheduling_strategy,
false,
false,
std::string(),
&violations,
&is_infeasible);
ASSERT_TRUE(result_node_id == node_0 || result_node_id == node_1);
ASSERT_FALSE(is_infeasible);
}

// Scheduling is infeasible when all specified nodes are infeasible..
{
NodeResources depleted_node_resources = CreateNodeResources({{ResourceID::CPU(), 0}});
resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(
node_0, depleted_node_resources);
resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(
node_1, depleted_node_resources);

LabelSelector selector;
selector.AddConstraint(LabelConstraint("ray.io/node-id",
LabelSelectorOperator::LABEL_IN,
{node_0_id_obj.Hex(), node_1_id_obj.Hex()}));
ResourceRequest request = base_resource_request;
request.SetLabelSelector(selector);

auto result_node_id = resource_scheduler.GetBestSchedulableNode(request,
scheduling_strategy,
false,
false,
std::string(),
&violations,
&is_infeasible);
ASSERT_TRUE(result_node_id.IsNil());
ASSERT_TRUE(is_infeasible);
}
}

} // namespace ray

int main(int argc, char **argv) {
Expand Down