diff --git a/python/ray/tests/test_draining.py b/python/ray/tests/test_draining.py index 373fca4a8824..c66557bcac6f 100644 --- a/python/ray/tests/test_draining.py +++ b/python/ray/tests/test_draining.py @@ -12,6 +12,7 @@ NodeAffinitySchedulingStrategy, PlacementGroupSchedulingStrategy, ) +from ray.util.state import list_tasks def test_idle_termination(ray_start_cluster): @@ -571,6 +572,59 @@ def func(signal, nodes): ray.get(r1) +def test_leases_rescheduling_during_draining(ray_start_cluster): + """Test that when a node is being drained, leases inside local lease manager + will be cancelled and re-added to the cluster lease manager for rescheduling + instead of being marked as permanently infeasible. + + This is regression test for https://github.com/ray-project/ray/pull/57834/ + """ + cluster = ray_start_cluster + cluster.add_node(num_cpus=0) + ray.init(address=cluster.address) + + worker1 = cluster.add_node(num_cpus=1) + cluster.wait_for_nodes() + + gcs_client = GcsClient(address=ray.get_runtime_context().gcs_address) + + @ray.remote(num_cpus=1) + class Actor: + def ping(self): + pass + + actor = Actor.remote() + ray.get(actor.ping.remote()) + + @ray.remote(num_cpus=1) + def get_node_id(): + return ray.get_runtime_context().get_node_id() + + obj_ref = get_node_id.options(name="f1").remote() + + def verify_f1_pending_node_assignment(): + tasks = list_tasks(filters=[("name", "=", "f1")]) + assert len(tasks) == 1 + assert tasks[0]["state"] == "PENDING_NODE_ASSIGNMENT" + return True + + # f1 should be in the local lease manager of worker1, + # waiting for resource to be available. + wait_for_condition(verify_f1_pending_node_assignment) + + is_accepted, _ = gcs_client.drain_node( + worker1.node_id, + autoscaler_pb2.DrainNodeReason.Value("DRAIN_NODE_REASON_PREEMPTION"), + "preemption", + 2**63 - 1, + ) + assert is_accepted + + # The task should be rescheduled on another node. + worker2 = cluster.add_node(num_cpus=1) + assert ray.get(obj_ref) == worker2.node_id + + if __name__ == "__main__": sys.exit(pytest.main(["-sv", __file__])) diff --git a/src/mock/ray/raylet/local_lease_manager.h b/src/mock/ray/raylet/local_lease_manager.h index 825dae47dde7..bb771dd5a6c8 100644 --- a/src/mock/ray/raylet/local_lease_manager.h +++ b/src/mock/ray/raylet/local_lease_manager.h @@ -31,6 +31,10 @@ class MockLocalLeaseManager : public LocalLeaseManagerInterface { rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type, const std::string &scheduling_failure_message), (override)); + MOCK_METHOD(std::vector>, + CancelLeasesWithoutReply, + (std::function &)> predicate), + (override)); MOCK_METHOD((const absl::flat_hash_map>> &), GetLeasesToGrant, diff --git a/src/ray/raylet/local_lease_manager.cc b/src/ray/raylet/local_lease_manager.cc index 402bfd6e1358..d2b586a71933 100644 --- a/src/ray/raylet/local_lease_manager.cc +++ b/src/ray/raylet/local_lease_manager.cc @@ -32,6 +32,18 @@ namespace ray { namespace raylet { +namespace { +void ReplyCancelled(const std::shared_ptr &work, + rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type, + const std::string &scheduling_failure_message) { + auto reply = work->reply_; + reply->set_canceled(true); + reply->set_failure_type(failure_type); + reply->set_scheduling_failure_message(scheduling_failure_message); + work->send_reply_callback_(Status::OK(), nullptr, nullptr); +} +} // namespace + LocalLeaseManager::LocalLeaseManager( const NodeID &self_node_id, ClusterResourceScheduler &cluster_resource_scheduler, @@ -410,10 +422,10 @@ void LocalLeaseManager::GrantScheduledLeasesToWorkers() { << front_lease.DebugString(); auto leases_to_grant_queue_iter = leases_to_grant_queue.begin(); while (leases_to_grant_queue_iter != leases_to_grant_queue.end()) { - CancelLeaseToGrant( - *leases_to_grant_queue_iter, - rpc::RequestWorkerLeaseReply::SCHEDULING_CANCELLED_UNSCHEDULABLE, - "Lease granting failed due to the lease becoming infeasible."); + CancelLeaseToGrantWithoutReply(*leases_to_grant_queue_iter); + ReplyCancelled(*leases_to_grant_queue_iter, + rpc::RequestWorkerLeaseReply::SCHEDULING_CANCELLED_UNSCHEDULABLE, + "Lease granting failed due to the lease becoming infeasible."); leases_to_grant_queue_iter = leases_to_grant_queue.erase(leases_to_grant_queue_iter); } @@ -536,7 +548,7 @@ bool LocalLeaseManager::PoppedWorkerHandler( const rpc::Address &owner_address, const std::string &runtime_env_setup_error_message) { const auto &reply = work->reply_; - const auto &callback = work->callback_; + const auto &send_reply_callback = work->send_reply_callback_; const bool canceled = work->GetState() == internal::WorkStatus::CANCELLED; const auto &lease = work->lease_; bool granted = false; @@ -650,7 +662,12 @@ bool LocalLeaseManager::PoppedWorkerHandler( RAY_LOG(DEBUG) << "Granting lease " << lease_id << " to worker " << worker->WorkerId(); - Grant(worker, leased_workers_, work->allocated_instances_, lease, reply, callback); + Grant(worker, + leased_workers_, + work->allocated_instances_, + lease, + reply, + send_reply_callback); erase_from_leases_to_grant_queue_fn(work, scheduling_class); granted = true; } @@ -660,11 +677,11 @@ bool LocalLeaseManager::PoppedWorkerHandler( void LocalLeaseManager::Spillback(const NodeID &spillback_to, const std::shared_ptr &work) { - auto send_reply_callback = work->callback_; + auto send_reply_callback = work->send_reply_callback_; if (work->grant_or_reject_) { work->reply_->set_rejected(true); - send_reply_callback(); + send_reply_callback(Status::OK(), nullptr, nullptr); return; } @@ -690,7 +707,7 @@ void LocalLeaseManager::Spillback(const NodeID &spillback_to, reply->mutable_retry_at_raylet_address()->set_port(node_info_ptr->node_manager_port()); reply->mutable_retry_at_raylet_address()->set_node_id(spillback_to.Binary()); - send_reply_callback(); + send_reply_callback(Status::OK(), nullptr, nullptr); } void LocalLeaseManager::LeasesUnblocked(const std::vector &ready_ids) { @@ -843,61 +860,52 @@ void LocalLeaseManager::ReleaseLeaseArgs(const LeaseID &lease_id) { } } -namespace { -void ReplyCancelled(const std::shared_ptr &work, - rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type, - const std::string &scheduling_failure_message) { - auto reply = work->reply_; - auto callback = work->callback_; - reply->set_canceled(true); - reply->set_failure_type(failure_type); - reply->set_scheduling_failure_message(scheduling_failure_message); - callback(); -} -} // namespace - -bool LocalLeaseManager::CancelLeases( - std::function &)> predicate, - rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type, - const std::string &scheduling_failure_message) { - bool tasks_cancelled = false; +std::vector> LocalLeaseManager::CancelLeasesWithoutReply( + std::function &)> predicate) { + std::vector> cancelled_works; ray::erase_if>( leases_to_grant_, [&](const std::shared_ptr &work) { if (!predicate(work)) { return false; } - CancelLeaseToGrant(work, failure_type, scheduling_failure_message); - tasks_cancelled = true; + CancelLeaseToGrantWithoutReply(work); + cancelled_works.push_back(work); return true; }); ray::erase_if>( waiting_lease_queue_, [&](const std::shared_ptr &work) { - if (predicate(work)) { - ReplyCancelled(work, failure_type, scheduling_failure_message); - if (!work->lease_.GetLeaseSpecification().GetDependencies().empty()) { - lease_dependency_manager_.RemoveLeaseDependencies( - work->lease_.GetLeaseSpecification().LeaseId()); - } - waiting_leases_index_.erase(work->lease_.GetLeaseSpecification().LeaseId()); - tasks_cancelled = true; - return true; - } else { + if (!predicate(work)) { return false; } + if (!work->lease_.GetLeaseSpecification().GetDependencies().empty()) { + lease_dependency_manager_.RemoveLeaseDependencies( + work->lease_.GetLeaseSpecification().LeaseId()); + } + waiting_leases_index_.erase(work->lease_.GetLeaseSpecification().LeaseId()); + cancelled_works.push_back(work); + return true; }); - return tasks_cancelled; + return cancelled_works; } -void LocalLeaseManager::CancelLeaseToGrant( - const std::shared_ptr &work, +bool LocalLeaseManager::CancelLeases( + std::function &)> predicate, rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type, const std::string &scheduling_failure_message) { + auto cancelled_works = CancelLeasesWithoutReply(predicate); + for (const auto &work : cancelled_works) { + ReplyCancelled(work, failure_type, scheduling_failure_message); + } + return !cancelled_works.empty(); +} + +void LocalLeaseManager::CancelLeaseToGrantWithoutReply( + const std::shared_ptr &work) { const LeaseID lease_id = work->lease_.GetLeaseSpecification().LeaseId(); RAY_LOG(DEBUG) << "Canceling lease " << lease_id << " from leases_to_grant_queue."; - ReplyCancelled(work, failure_type, scheduling_failure_message); if (work->GetState() == internal::WorkStatus::WAITING_FOR_WORKER) { // We've already acquired resources so we need to release them. cluster_resource_scheduler_.GetLocalResourceManager().ReleaseWorkerResources( @@ -962,7 +970,7 @@ void LocalLeaseManager::Grant( const std::shared_ptr &allocated_instances, const RayLease &lease, rpc::RequestWorkerLeaseReply *reply, - std::function send_reply_callback) { + rpc::SendReplyCallback send_reply_callback) { const auto &lease_spec = lease.GetLeaseSpecification(); if (lease_spec.IsActorCreationTask()) { @@ -1011,7 +1019,7 @@ void LocalLeaseManager::Grant( } } // Send the result back. - send_reply_callback(); + send_reply_callback(Status::OK(), nullptr, nullptr); } void LocalLeaseManager::ClearWorkerBacklog(const WorkerID &worker_id) { diff --git a/src/ray/raylet/local_lease_manager.h b/src/ray/raylet/local_lease_manager.h index 40717e3e513d..c2fa2e8e1c52 100644 --- a/src/ray/raylet/local_lease_manager.h +++ b/src/ray/raylet/local_lease_manager.h @@ -122,6 +122,9 @@ class LocalLeaseManager : public LocalLeaseManagerInterface { rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type, const std::string &scheduling_failure_message) override; + std::vector> CancelLeasesWithoutReply( + std::function &)> predicate) override; + /// Return with an exemplar if any leases are pending resource acquisition. /// /// \param[in,out] num_pending_actor_creation: Number of pending actor creation leases. @@ -202,11 +205,7 @@ class LocalLeaseManager : public LocalLeaseManagerInterface { const std::string &runtime_env_setup_error_message); /// Cancels a lease in leases_to_grant_. Does not remove it from leases_to_grant_. - void CancelLeaseToGrant( - const std::shared_ptr &work, - rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type = - rpc::RequestWorkerLeaseReply::SCHEDULING_CANCELLED_INTENDED, - const std::string &scheduling_failure_message = ""); + void CancelLeaseToGrantWithoutReply(const std::shared_ptr &work); /// Attempts to grant all leases which are ready to run. A lease /// will be granted if it is on `leases_to_grant_` and there are still @@ -250,7 +249,7 @@ class LocalLeaseManager : public LocalLeaseManagerInterface { const std::shared_ptr &allocated_instances, const RayLease &lease, rpc::RequestWorkerLeaseReply *reply, - std::function send_reply_callback); + rpc::SendReplyCallback send_reply_callback); void Spillback(const NodeID &spillback_to, const std::shared_ptr &work); diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 972fea52d2d8..9731264347f4 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -2083,7 +2083,23 @@ void NodeManager::HandleDrainRaylet(rpc::DrainRayletRequest request, reply->set_is_accepted(true); } + const bool is_drain_accepted = reply->is_accepted(); send_reply_callback(Status::OK(), nullptr, nullptr); + + if (is_drain_accepted) { + // Fail fast on the leases in the local lease manager + // and add them back to the cluster lease manager so a new node + // can be selected by the scheduler. + auto cancelled_works = local_lease_manager_.CancelLeasesWithoutReply( + [&](const std::shared_ptr &work) { return true; }); + for (const auto &work : cancelled_works) { + cluster_lease_manager_.QueueAndScheduleLease(work->lease_, + work->grant_or_reject_, + work->is_selected_based_on_locality_, + work->reply_, + work->send_reply_callback_); + } + } } void NodeManager::HandleShutdownRaylet(rpc::ShutdownRayletRequest request, diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index a5a7de57de87..64315a8205df 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -302,6 +302,11 @@ class NodeManager : public rpc::NodeManagerServiceHandler, rpc::CancelWorkerLeaseReply *reply, rpc::SendReplyCallback send_reply_callback) override; + /// Handle a `DrainRaylet` request. + void HandleDrainRaylet(rpc::DrainRayletRequest request, + rpc::DrainRayletReply *reply, + rpc::SendReplyCallback send_reply_callback) override; + private: FRIEND_TEST(NodeManagerStaticTest, TestHandleReportWorkerBacklog); @@ -589,11 +594,6 @@ class NodeManager : public rpc::NodeManagerServiceHandler, rpc::ShutdownRayletReply *reply, rpc::SendReplyCallback send_reply_callback) override; - /// Handle a `DrainRaylet` request. - void HandleDrainRaylet(rpc::DrainRayletRequest request, - rpc::DrainRayletReply *reply, - rpc::SendReplyCallback send_reply_callback) override; - void HandleIsLocalWorkerDead(rpc::IsLocalWorkerDeadRequest request, rpc::IsLocalWorkerDeadReply *reply, rpc::SendReplyCallback send_reply_callback) override; diff --git a/src/ray/raylet/scheduling/BUILD.bazel b/src/ray/raylet/scheduling/BUILD.bazel index f7f8ca3d4c5c..01987f04a65f 100644 --- a/src/ray/raylet/scheduling/BUILD.bazel +++ b/src/ray/raylet/scheduling/BUILD.bazel @@ -36,6 +36,7 @@ ray_cc_library( "//src/ray/common:lease", "//src/ray/common/scheduling:cluster_resource_data", "//src/ray/protobuf:node_manager_cc_proto", + "//src/ray/rpc:rpc_callback_types", ], ) diff --git a/src/ray/raylet/scheduling/cluster_lease_manager.cc b/src/ray/raylet/scheduling/cluster_lease_manager.cc index 92096bc8de40..a1cff110ab6c 100644 --- a/src/ray/raylet/scheduling/cluster_lease_manager.cc +++ b/src/ray/raylet/scheduling/cluster_lease_manager.cc @@ -53,14 +53,11 @@ void ClusterLeaseManager::QueueAndScheduleLease( RAY_LOG(DEBUG) << "Queuing and scheduling lease " << lease.GetLeaseSpecification().LeaseId(); const auto scheduling_class = lease.GetLeaseSpecification().GetSchedulingClass(); - auto work = std::make_shared( - std::move(lease), - grant_or_reject, - is_selected_based_on_locality, - reply, - [send_reply_callback = std::move(send_reply_callback)] { - send_reply_callback(Status::OK(), nullptr, nullptr); - }); + auto work = std::make_shared(std::move(lease), + grant_or_reject, + is_selected_based_on_locality, + reply, + std::move(send_reply_callback)); // If the scheduling class is infeasible, just add the work to the infeasible queue // directly. auto infeasible_leases_iter = infeasible_leases_.find(scheduling_class); @@ -77,11 +74,10 @@ void ReplyCancelled(const internal::Work &work, rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type, const std::string &scheduling_failure_message) { auto reply = work.reply_; - auto callback = work.callback_; reply->set_canceled(true); reply->set_failure_type(failure_type); reply->set_scheduling_failure_message(scheduling_failure_message); - callback(); + work.send_reply_callback_(Status::OK(), nullptr, nullptr); } } // namespace @@ -429,11 +425,11 @@ void ClusterLeaseManager::ScheduleOnNode(const NodeID &spillback_to, return; } - auto send_reply_callback = work->callback_; + auto send_reply_callback = work->send_reply_callback_; if (work->grant_or_reject_) { work->reply_->set_rejected(true); - send_reply_callback(); + send_reply_callback(Status::OK(), nullptr, nullptr); return; } @@ -459,7 +455,7 @@ void ClusterLeaseManager::ScheduleOnNode(const NodeID &spillback_to, reply->mutable_retry_at_raylet_address()->set_port((*node_info).node_manager_port()); reply->mutable_retry_at_raylet_address()->set_node_id(spillback_to.Binary()); - send_reply_callback(); + send_reply_callback(Status::OK(), nullptr, nullptr); } ClusterResourceScheduler &ClusterLeaseManager::GetClusterResourceScheduler() const { diff --git a/src/ray/raylet/scheduling/internal.h b/src/ray/raylet/scheduling/internal.h index 79d7c4cac60f..99103a34a729 100644 --- a/src/ray/raylet/scheduling/internal.h +++ b/src/ray/raylet/scheduling/internal.h @@ -19,6 +19,7 @@ #include "ray/common/lease/lease.h" #include "ray/common/scheduling/cluster_resource_data.h" +#include "ray/rpc/rpc_callback_types.h" #include "src/ray/protobuf/node_manager.pb.h" namespace ray::raylet::internal { @@ -57,19 +58,19 @@ class Work { bool grant_or_reject_; bool is_selected_based_on_locality_; rpc::RequestWorkerLeaseReply *reply_; - std::function callback_; + rpc::SendReplyCallback send_reply_callback_; std::shared_ptr allocated_instances_; Work(RayLease lease, bool grant_or_reject, bool is_selected_based_on_locality, rpc::RequestWorkerLeaseReply *reply, - std::function callback, + rpc::SendReplyCallback send_reply_callback, WorkStatus status = WorkStatus::WAITING) : lease_(std::move(lease)), grant_or_reject_(grant_or_reject), is_selected_based_on_locality_(is_selected_based_on_locality), reply_(reply), - callback_(std::move(callback)), + send_reply_callback_(std::move(send_reply_callback)), allocated_instances_(nullptr), status_(status){}; Work(const Work &Work) = delete; diff --git a/src/ray/raylet/scheduling/local_lease_manager_interface.h b/src/ray/raylet/scheduling/local_lease_manager_interface.h index 8017efb1be13..19ff311cb1cc 100644 --- a/src/ray/raylet/scheduling/local_lease_manager_interface.h +++ b/src/ray/raylet/scheduling/local_lease_manager_interface.h @@ -53,6 +53,12 @@ class LocalLeaseManagerInterface { rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type, const std::string &scheduling_failure_message) = 0; + /// Similar to `CancelLeases`. The only difference is that this method does not send + /// RequestWorkerLease replies for those cancelled leases. + /// \return A list of cancelled leases. + virtual std::vector> CancelLeasesWithoutReply( + std::function &)> predicate) = 0; + virtual const absl::flat_hash_map>> &GetLeasesToGrant() const = 0; @@ -114,6 +120,11 @@ class NoopLocalLeaseManager : public LocalLeaseManagerInterface { return false; } + std::vector> CancelLeasesWithoutReply( + std::function &)> predicate) override { + return {}; + } + const absl::flat_hash_map>> &GetLeasesToGrant() const override { static const absl::flat_hash_map local_lease_manager_; }; +TEST_F(LocalLeaseManagerTest, TestCancelLeasesWithoutReply) { + int num_callbacks_called = 0; + auto callback = [&num_callbacks_called](Status status, + std::function success, + std::function failure) { + ++num_callbacks_called; + }; + + auto lease1 = CreateLease({{ray::kCPU_ResourceLabel, 1}}, "f"); + rpc::RequestWorkerLeaseReply reply1; + // lease1 is waiting for a worker + local_lease_manager_->QueueAndScheduleLease(std::make_shared( + lease1, false, false, &reply1, callback, internal::WorkStatus::WAITING)); + + auto arg_id = ObjectID::FromRandom(); + std::vector> args; + args.push_back( + std::make_unique(arg_id, rpc::Address{}, "call_site")); + auto lease2 = CreateLease({{kCPU_ResourceLabel, 1}}, "f", args); + EXPECT_CALL(object_manager_, Pull(_, _, _)).WillOnce(::testing::Return(1)); + rpc::RequestWorkerLeaseReply reply2; + // lease2 is waiting for args + local_lease_manager_->QueueAndScheduleLease(std::make_shared( + lease2, false, false, &reply2, callback, internal::WorkStatus::WAITING)); + + auto cancelled_works = local_lease_manager_->CancelLeasesWithoutReply( + [](const std::shared_ptr &work) { return true; }); + ASSERT_EQ(cancelled_works.size(), 2); + // Make sure the reply is not sent. + ASSERT_EQ(num_callbacks_called, 0); +} + TEST_F(LocalLeaseManagerTest, TestLeaseGrantingOrder) { // Initial setup: 3 CPUs available. std::shared_ptr worker1 = @@ -398,11 +430,21 @@ TEST_F(LocalLeaseManagerTest, TestLeaseGrantingOrder) { auto lease_f2 = CreateLease({{ray::kCPU_ResourceLabel, 1}}, "f"); rpc::RequestWorkerLeaseReply reply; local_lease_manager_->WaitForLeaseArgsRequests(std::make_shared( - lease_f1, false, false, &reply, [] {}, internal::WorkStatus::WAITING)); + lease_f1, + false, + false, + &reply, + [](Status status, std::function success, std::function failure) {}, + internal::WorkStatus::WAITING)); local_lease_manager_->ScheduleAndGrantLeases(); pool_.TriggerCallbacks(); local_lease_manager_->WaitForLeaseArgsRequests(std::make_shared( - lease_f2, false, false, &reply, [] {}, internal::WorkStatus::WAITING)); + lease_f2, + false, + false, + &reply, + [](Status status, std::function success, std::function failure) {}, + internal::WorkStatus::WAITING)); local_lease_manager_->ScheduleAndGrantLeases(); pool_.TriggerCallbacks(); @@ -412,13 +454,33 @@ TEST_F(LocalLeaseManagerTest, TestLeaseGrantingOrder) { auto lease_f5 = CreateLease({{ray::kCPU_ResourceLabel, 1}}, "f"); auto lease_g1 = CreateLease({{ray::kCPU_ResourceLabel, 1}}, "g"); local_lease_manager_->WaitForLeaseArgsRequests(std::make_shared( - lease_f3, false, false, &reply, [] {}, internal::WorkStatus::WAITING)); + lease_f3, + false, + false, + &reply, + [](Status status, std::function success, std::function failure) {}, + internal::WorkStatus::WAITING)); local_lease_manager_->WaitForLeaseArgsRequests(std::make_shared( - lease_f4, false, false, &reply, [] {}, internal::WorkStatus::WAITING)); + lease_f4, + false, + false, + &reply, + [](Status status, std::function success, std::function failure) {}, + internal::WorkStatus::WAITING)); local_lease_manager_->WaitForLeaseArgsRequests(std::make_shared( - lease_f5, false, false, &reply, [] {}, internal::WorkStatus::WAITING)); + lease_f5, + false, + false, + &reply, + [](Status status, std::function success, std::function failure) {}, + internal::WorkStatus::WAITING)); local_lease_manager_->WaitForLeaseArgsRequests(std::make_shared( - lease_g1, false, false, &reply, [] {}, internal::WorkStatus::WAITING)); + lease_g1, + false, + false, + &reply, + [](Status status, std::function success, std::function failure) {}, + internal::WorkStatus::WAITING)); local_lease_manager_->ScheduleAndGrantLeases(); pool_.TriggerCallbacks(); auto leases_to_grant_ = local_lease_manager_->GetLeasesToGrant(); @@ -451,7 +513,11 @@ TEST_F(LocalLeaseManagerTest, TestNoLeakOnImpossibleInfeasibleLease) { // Submit the leases to the local lease manager. int num_callbacks_called = 0; - auto callback = [&num_callbacks_called]() { ++num_callbacks_called; }; + auto callback = [&num_callbacks_called](Status status, + std::function success, + std::function failure) { + ++num_callbacks_called; + }; rpc::RequestWorkerLeaseReply reply1; local_lease_manager_->QueueAndScheduleLease(std::make_shared( lease1, false, false, &reply1, callback, internal::WorkStatus::WAITING)); diff --git a/src/ray/raylet/tests/node_manager_test.cc b/src/ray/raylet/tests/node_manager_test.cc index 70e7f0ed923a..089912d1a33e 100644 --- a/src/ray/raylet/tests/node_manager_test.cc +++ b/src/ray/raylet/tests/node_manager_test.cc @@ -434,7 +434,7 @@ class NodeManagerTest : public ::testing::Test { std::unique_ptr core_worker_subscriber_; std::unique_ptr cluster_resource_scheduler_; std::unique_ptr local_lease_manager_; - std::unique_ptr cluster_lease_manager_; + std::unique_ptr cluster_lease_manager_; std::shared_ptr local_object_manager_; std::unique_ptr lease_dependency_manager_; std::unique_ptr mock_gcs_client_ = @@ -1104,6 +1104,40 @@ size_t GetPendingLeaseWorkerCount(const LocalLeaseManager &local_lease_manager) local_lease_manager.leases_to_grant_.size(); } +TEST_F(NodeManagerTest, TestReschedulingLeasesDuringHandleDrainRaylet) { + // Test that when the node is being drained, leases inside local lease manager + // will be cancelled and re-added to the cluster lease manager for rescheduling. + auto lease_spec = BuildLeaseSpec({}); + rpc::RequestWorkerLeaseRequest request_worker_lease_request; + rpc::RequestWorkerLeaseReply request_worker_lease_reply; + LeaseID lease_id = LeaseID::FromRandom(); + lease_spec.GetMutableMessage().set_lease_id(lease_id.Binary()); + request_worker_lease_request.mutable_lease_spec()->CopyFrom(lease_spec.GetMessage()); + request_worker_lease_request.set_backlog_size(1); + request_worker_lease_request.set_grant_or_reject(true); + request_worker_lease_request.set_is_selected_based_on_locality(true); + node_manager_->HandleRequestWorkerLease( + request_worker_lease_request, + &request_worker_lease_reply, + [](Status s, std::function success, std::function failure) { + ASSERT_FALSE(true) << "This callback should not be called."; + }); + ASSERT_EQ(GetPendingLeaseWorkerCount(*local_lease_manager_), 1); + rpc::DrainRayletRequest drain_raylet_request; + rpc::DrainRayletReply drain_raylet_reply; + drain_raylet_request.set_reason( + rpc::autoscaler::DrainNodeReason::DRAIN_NODE_REASON_PREEMPTION); + node_manager_->HandleDrainRaylet( + drain_raylet_request, + &drain_raylet_reply, + [](Status s, std::function success, std::function failure) { + ASSERT_TRUE(s.ok()); + }); + ASSERT_EQ(GetPendingLeaseWorkerCount(*local_lease_manager_), 0); + // The lease is infeasible now since the local node is draining. + ASSERT_EQ(cluster_lease_manager_->GetInfeasibleQueueSize(), 1); +} + TEST_F(NodeManagerTest, RetryHandleCancelWorkerLeaseWhenHasLeaseRequest) { auto lease_spec = BuildLeaseSpec({}); rpc::RequestWorkerLeaseRequest request_worker_lease_request;