Skip to content

Commit

Permalink
[core] Fix the spilling back failure in case of node missing (#19564)
Browse files Browse the repository at this point in the history
## Why are these changes needed?
When ray spill back, it'll check whether the node exists or not through gcs, so there is a race condition and sometimes raylet crashes due to this.

This PR filter out the node that's not available when select the node.

## Related issue number
#19438
  • Loading branch information
fishbone authored Oct 22, 2021
1 parent 530f2d7 commit 48fb86a
Show file tree
Hide file tree
Showing 20 changed files with 290 additions and 295 deletions.
3 changes: 3 additions & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -973,6 +973,7 @@ cc_test(
copts = COPTS,
tags = ["team:core"],
deps = [
":ray_mock",
":raylet_lib",
"@com_google_googletest//:gtest_main",
],
Expand Down Expand Up @@ -1001,6 +1002,7 @@ cc_test(
copts = COPTS,
tags = ["team:core"],
deps = [
":ray_mock",
":raylet_lib",
"@com_google_googletest//:gtest_main",
],
Expand Down Expand Up @@ -1206,6 +1208,7 @@ cc_test(
"gcs_test_util_lib",
"ray_common",
"raylet_lib",
":ray_mock",
"@com_google_googletest//:gtest_main",
],
)
Expand Down
2 changes: 1 addition & 1 deletion src/mock/ray/gcs/accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ class MockNodeInfoAccessor : public NodeInfoAccessor {
((const SubscribeCallback<NodeID, rpc::GcsNodeInfo> &subscribe),
const StatusCallback &done),
(override));
MOCK_METHOD(absl::optional<rpc::GcsNodeInfo>, Get,
MOCK_METHOD(const rpc::GcsNodeInfo *, Get,
(const NodeID &node_id, bool filter_dead_nodes), (const, override));
MOCK_METHOD((const std::unordered_map<NodeID, rpc::GcsNodeInfo> &), GetAll, (),
(const, override));
Expand Down
30 changes: 30 additions & 0 deletions src/mock/ray/gcs/gcs_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,40 @@ class MockGcsClient : public GcsClient {
MOCK_METHOD((std::pair<std::string, int>), GetGcsServerAddress, (), (override));
MOCK_METHOD(std::string, DebugString, (), (const, override));
MockGcsClient() {
mock_job_accessor = new MockJobInfoAccessor();
mock_actor_accessor = new MockActorInfoAccessor();
mock_object_accessor = new MockObjectInfoAccessor();
mock_node_accessor = new MockNodeInfoAccessor();
mock_node_resource_accessor = new MockNodeResourceInfoAccessor();
mock_task_accessor = new MockTaskInfoAccessor();
mock_error_accessor = new MockErrorInfoAccessor();
mock_stats_accessor = new MockStatsInfoAccessor();
mock_worker_accessor = new MockWorkerInfoAccessor();
mock_placement_group_accessor = new MockPlacementGroupInfoAccessor();
mock_internal_kv_accessor = new MockInternalKVAccessor();

GcsClient::job_accessor_.reset(mock_job_accessor);
GcsClient::actor_accessor_.reset(mock_actor_accessor);
GcsClient::node_accessor_.reset(mock_node_accessor);
GcsClient::node_resource_accessor_.reset(mock_node_resource_accessor);
GcsClient::task_accessor_.reset(mock_task_accessor);
GcsClient::object_accessor_.reset(mock_object_accessor);
GcsClient::stats_accessor_.reset(mock_stats_accessor);
GcsClient::error_accessor_.reset(mock_error_accessor);
GcsClient::worker_accessor_.reset(mock_worker_accessor);
GcsClient::placement_group_accessor_.reset(mock_placement_group_accessor);
}
MockActorInfoAccessor *mock_actor_accessor;
MockJobInfoAccessor *mock_job_accessor;
MockObjectInfoAccessor *mock_object_accessor;
MockNodeInfoAccessor *mock_node_accessor;
MockNodeResourceInfoAccessor *mock_node_resource_accessor;
MockTaskInfoAccessor *mock_task_accessor;
MockErrorInfoAccessor *mock_error_accessor;
MockStatsInfoAccessor *mock_stats_accessor;
MockWorkerInfoAccessor *mock_worker_accessor;
MockPlacementGroupInfoAccessor *mock_placement_group_accessor;
MockInternalKVAccessor *mock_internal_kv_accessor;
};

} // namespace gcs
Expand Down
5 changes: 0 additions & 5 deletions src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,6 @@ RAY_CONFIG(bool, lineage_pinning_enabled, false)
/// See also: https://github.com/ray-project/ray/issues/14182
RAY_CONFIG(bool, preallocate_plasma_memory, false)

/// Whether to use the hybrid scheduling policy, or one of the legacy spillback
/// strategies. In the hybrid scheduling strategy, leases are packed until a threshold,
/// then spread via weighted (by critical resource usage).
RAY_CONFIG(bool, scheduler_hybrid_scheduling, true)

/// The fraction of resource utilization on a node after which the scheduler starts
/// to prefer spreading tasks to other nodes. This balances between locality and
/// even balancing of load. Low values (min 0.0) encourage more load spreading.
Expand Down
2 changes: 1 addition & 1 deletion src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_

auto check_node_alive_fn = [this](const NodeID &node_id) {
auto node = gcs_client_->Nodes().Get(node_id);
return node.has_value();
return node != nullptr;
};
auto reconstruct_object_callback = [this](const ObjectID &object_id) {
io_service_.post(
Expand Down
4 changes: 2 additions & 2 deletions src/ray/gcs/accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -462,8 +462,8 @@ class NodeInfoAccessor {
/// \param filter_dead_nodes Whether or not if this method will filter dead nodes.
/// \return The item returned by GCS. If the item to read doesn't exist or the node is
/// dead, this optional object is empty.
virtual absl::optional<rpc::GcsNodeInfo> Get(const NodeID &node_id,
bool filter_dead_nodes = true) const = 0;
virtual const rpc::GcsNodeInfo *Get(const NodeID &node_id,
bool filter_dead_nodes = true) const = 0;

/// Get information of all nodes from local cache.
/// Non-thread safe.
Expand Down
10 changes: 5 additions & 5 deletions src/ray/gcs/gcs_client/service_based_accessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -540,17 +540,17 @@ Status ServiceBasedNodeInfoAccessor::AsyncSubscribeToNodeChange(
});
}

absl::optional<GcsNodeInfo> ServiceBasedNodeInfoAccessor::Get(
const NodeID &node_id, bool filter_dead_nodes) const {
const GcsNodeInfo *ServiceBasedNodeInfoAccessor::Get(const NodeID &node_id,
bool filter_dead_nodes) const {
RAY_CHECK(!node_id.IsNil());
auto entry = node_cache_.find(node_id);
if (entry != node_cache_.end()) {
if (filter_dead_nodes && entry->second.state() == rpc::GcsNodeInfo::DEAD) {
return absl::nullopt;
return nullptr;
}
return entry->second;
return &entry->second;
}
return absl::nullopt;
return nullptr;
}

const std::unordered_map<NodeID, GcsNodeInfo> &ServiceBasedNodeInfoAccessor::GetAll()
Expand Down
4 changes: 2 additions & 2 deletions src/ray/gcs/gcs_client/service_based_accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,8 @@ class ServiceBasedNodeInfoAccessor : public NodeInfoAccessor {
const SubscribeCallback<NodeID, rpc::GcsNodeInfo> &subscribe,
const StatusCallback &done) override;

absl::optional<rpc::GcsNodeInfo> Get(const NodeID &node_id,
bool filter_dead_nodes = false) const override;
const rpc::GcsNodeInfo *Get(const NodeID &node_id,
bool filter_dead_nodes = false) const override;

const std::unordered_map<NodeID, rpc::GcsNodeInfo> &GetAll() const override;

Expand Down
6 changes: 3 additions & 3 deletions src/ray/gcs/gcs_server/test/gcs_server_test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -426,9 +426,9 @@ struct GcsServerMocker {
return Status::NotImplemented("");
}

absl::optional<rpc::GcsNodeInfo> Get(const NodeID &node_id,
bool filter_dead_nodes = true) const override {
return absl::nullopt;
const rpc::GcsNodeInfo *Get(const NodeID &node_id,
bool filter_dead_nodes = true) const override {
return nullptr;
}

const std::unordered_map<NodeID, rpc::GcsNodeInfo> &GetAll() const override {
Expand Down
6 changes: 3 additions & 3 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ NodeManager::NodeManager(instrumented_io_context &io_service, const NodeID &self
cluster_resource_scheduler_ =
std::shared_ptr<ClusterResourceScheduler>(new ClusterResourceScheduler(
self_node_id_.Binary(), local_resources.GetTotalResources().GetResourceMap(),
[this]() { return object_manager_.GetUsedMemory(); },
*gcs_client_, [this]() { return object_manager_.GetUsedMemory(); },
[this]() { return object_manager_.PullManagerHasPullsQueued(); }));

auto get_node_info_func = [this](const NodeID &node_id) {
Expand All @@ -330,7 +330,7 @@ NodeManager::NodeManager(instrumented_io_context &io_service, const NodeID &self
return !(failed_workers_cache_.count(owner_worker_id) > 0 ||
failed_nodes_cache_.count(owner_node_id) > 0);
};
cluster_task_manager_ = std::shared_ptr<ClusterTaskManager>(new ClusterTaskManager(
cluster_task_manager_ = std::make_shared<ClusterTaskManager>(
self_node_id_,
std::dynamic_pointer_cast<ClusterResourceScheduler>(cluster_resource_scheduler_),
dependency_manager_, is_owner_alive, get_node_info_func, announce_infeasible_task,
Expand All @@ -339,7 +339,7 @@ NodeManager::NodeManager(instrumented_io_context &io_service, const NodeID &self
std::vector<std::unique_ptr<RayObject>> *results) {
return GetObjectsFromPlasma(object_ids, results);
},
max_task_args_memory));
max_task_args_memory);
placement_group_resource_manager_ = std::make_shared<NewPlacementGroupResourceManager>(
std::dynamic_pointer_cast<ClusterResourceScheduler>(cluster_resource_scheduler_),
// TODO (Alex): Ideally we could do these in a more robust way (retry
Expand Down
45 changes: 27 additions & 18 deletions src/ray/raylet/placement_group_resource_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// clang-format off
#include "ray/raylet/placement_group_resource_manager.h"

#include <memory>
Expand All @@ -21,18 +22,26 @@
#include "ray/common/id.h"
#include "ray/common/task/scheduling_resources.h"
#include "ray/gcs/test/gcs_test_util.h"
#include "mock/ray/gcs/gcs_client.h"
// clang-format on

namespace ray {

class NewPlacementGroupResourceManagerTest : public ::testing::Test {
public:
std::unique_ptr<raylet::NewPlacementGroupResourceManager>
new_placement_group_resource_manager_;

std::unique_ptr<gcs::MockGcsClient> gcs_client_;
rpc::GcsNodeInfo node_info_;
void SetUp() {
gcs_client_ = std::make_unique<gcs::MockGcsClient>();
EXPECT_CALL(*gcs_client_->mock_node_accessor, Get(::testing::_, ::testing::_))
.WillRepeatedly(::testing::Return(&node_info_));
}
void InitLocalAvailableResource(
absl::flat_hash_map<std::string, double> &unit_resource) {
auto cluster_resource_scheduler_ =
std::make_shared<ClusterResourceScheduler>("local", unit_resource);
std::make_shared<ClusterResourceScheduler>("local", unit_resource, *gcs_client_);
new_placement_group_resource_manager_.reset(
new raylet::NewPlacementGroupResourceManager(
cluster_resource_scheduler_,
Expand Down Expand Up @@ -110,8 +119,8 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewCommitBundleResource) {
{"CPU", 1.0},
{"bundle_group_1_" + group_id.Hex(), 1000},
{"bundle_group_" + group_id.Hex(), 1000}};
auto remaining_resource_scheduler =
std::make_shared<ClusterResourceScheduler>("remaining", remaining_resources);
auto remaining_resource_scheduler = std::make_shared<ClusterResourceScheduler>(
"remaining", remaining_resources, *gcs_client_);
std::shared_ptr<TaskResourceInstances> resource_instances =
std::make_shared<TaskResourceInstances>();
ASSERT_TRUE(remaining_resource_scheduler->AllocateLocalTaskResources(
Expand Down Expand Up @@ -139,8 +148,8 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewReturnBundleResource) {
new_placement_group_resource_manager_->ReturnBundle(bundle_spec);
ASSERT_TRUE(delete_called_);
/// 5. check remaining resources is correct.
auto remaining_resource_scheduler =
std::make_shared<ClusterResourceScheduler>("remaining", unit_resource);
auto remaining_resource_scheduler = std::make_shared<ClusterResourceScheduler>(
"remaining", unit_resource, *gcs_client_);
auto remaining_resource_instance =
remaining_resource_scheduler->GetLocalNodeResources();
CheckRemainingResourceCorrect(remaining_resource_instance);
Expand Down Expand Up @@ -175,8 +184,8 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewMultipleBundlesCommitAndRetu
{"bundle_group_1_" + group_id.Hex(), 1000},
{"bundle_group_2_" + group_id.Hex(), 1000},
{"bundle_group_" + group_id.Hex(), 2000}};
auto remaining_resource_scheduler =
std::make_shared<ClusterResourceScheduler>("remaining", remaining_resources);
auto remaining_resource_scheduler = std::make_shared<ClusterResourceScheduler>(
"remaining", remaining_resources, *gcs_client_);
std::shared_ptr<TaskResourceInstances> resource_instances =
std::make_shared<TaskResourceInstances>();
ASSERT_TRUE(remaining_resource_scheduler->AllocateLocalTaskResources(
Expand All @@ -197,8 +206,8 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewMultipleBundlesCommitAndRetu
{"CPU", 2.0},
{"bundle_group_1_" + group_id.Hex(), 1000},
{"bundle_group_" + group_id.Hex(), 2000}};
remaining_resource_scheduler =
std::make_shared<ClusterResourceScheduler>("remaining", remaining_resources);
remaining_resource_scheduler = std::make_shared<ClusterResourceScheduler>(
"remaining", remaining_resources, *gcs_client_);
ASSERT_TRUE(remaining_resource_scheduler->AllocateLocalTaskResources(
{{"CPU_group_" + group_id.Hex(), 1.0},
{"CPU", 1.0},
Expand All @@ -210,8 +219,8 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewMultipleBundlesCommitAndRetu
new_placement_group_resource_manager_->ReturnBundle(first_bundle_spec);
/// 8. check remaining resources is correct after all bundle returned.
remaining_resources = {{"CPU", 2.0}};
remaining_resource_scheduler =
std::make_shared<ClusterResourceScheduler>("remaining", remaining_resources);
remaining_resource_scheduler = std::make_shared<ClusterResourceScheduler>(
"remaining", remaining_resources, *gcs_client_);
remaining_resource_instance = remaining_resource_scheduler->GetLocalNodeResources();
ASSERT_TRUE(update_called_);
ASSERT_TRUE(delete_called_);
Expand All @@ -234,8 +243,8 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewIdempotencyWithMultiPrepare)
}
/// 4. check remaining resources is correct.
absl::flat_hash_map<std::string, double> remaining_resources = {{"CPU", 3.0}};
auto remaining_resource_scheduler =
std::make_shared<ClusterResourceScheduler>("remaining", remaining_resources);
auto remaining_resource_scheduler = std::make_shared<ClusterResourceScheduler>(
"remaining", remaining_resources, *gcs_client_);
std::shared_ptr<TaskResourceInstances> resource_instances =
std::make_shared<TaskResourceInstances>();
ASSERT_TRUE(remaining_resource_scheduler->AllocateLocalTaskResources(
Expand Down Expand Up @@ -266,8 +275,8 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewIdempotencyWithRandomOrder)
{"CPU", 3.0},
{"bundle_group_1_" + group_id.Hex(), 1000},
{"bundle_group_" + group_id.Hex(), 1000}};
auto remaining_resource_scheduler =
std::make_shared<ClusterResourceScheduler>("remaining", remaining_resources);
auto remaining_resource_scheduler = std::make_shared<ClusterResourceScheduler>(
"remaining", remaining_resources, *gcs_client_);
std::shared_ptr<TaskResourceInstances> resource_instances =
std::make_shared<TaskResourceInstances>();
ASSERT_TRUE(remaining_resource_scheduler->AllocateLocalTaskResources(
Expand All @@ -288,8 +297,8 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewIdempotencyWithRandomOrder)
new_placement_group_resource_manager_->ReturnBundle(bundle_spec);
new_placement_group_resource_manager_->CommitBundle(bundle_spec);
// 8. check remaining resources is correct.
remaining_resource_scheduler =
std::make_shared<ClusterResourceScheduler>("remaining", available_resource);
remaining_resource_scheduler = std::make_shared<ClusterResourceScheduler>(
"remaining", available_resource, *gcs_client_);
remaining_resource_instance = remaining_resource_scheduler->GetLocalNodeResources();
CheckRemainingResourceCorrect(remaining_resource_instance);
}
Expand Down
Loading

0 comments on commit 48fb86a

Please sign in to comment.