Skip to content

Commit 0ba27e3

Browse files
jjyaoelliot-barn
authored andcommitted
[Core] Reschedule leases in local lease manager when draining the node (#57834)
Signed-off-by: Jiajun Yao <[email protected]> Signed-off-by: elliot-barn <[email protected]>
1 parent 1da4ea3 commit 0ba27e3

File tree

12 files changed

+270
-80
lines changed

12 files changed

+270
-80
lines changed

python/ray/tests/test_draining.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
NodeAffinitySchedulingStrategy,
1313
PlacementGroupSchedulingStrategy,
1414
)
15+
from ray.util.state import list_tasks
1516

1617

1718
def test_idle_termination(ray_start_cluster):
@@ -571,6 +572,59 @@ def func(signal, nodes):
571572
ray.get(r1)
572573

573574

575+
def test_leases_rescheduling_during_draining(ray_start_cluster):
576+
"""Test that when a node is being drained, leases inside local lease manager
577+
will be cancelled and re-added to the cluster lease manager for rescheduling
578+
instead of being marked as permanently infeasible.
579+
580+
This is regression test for https://github.com/ray-project/ray/pull/57834/
581+
"""
582+
cluster = ray_start_cluster
583+
cluster.add_node(num_cpus=0)
584+
ray.init(address=cluster.address)
585+
586+
worker1 = cluster.add_node(num_cpus=1)
587+
cluster.wait_for_nodes()
588+
589+
gcs_client = GcsClient(address=ray.get_runtime_context().gcs_address)
590+
591+
@ray.remote(num_cpus=1)
592+
class Actor:
593+
def ping(self):
594+
pass
595+
596+
actor = Actor.remote()
597+
ray.get(actor.ping.remote())
598+
599+
@ray.remote(num_cpus=1)
600+
def get_node_id():
601+
return ray.get_runtime_context().get_node_id()
602+
603+
obj_ref = get_node_id.options(name="f1").remote()
604+
605+
def verify_f1_pending_node_assignment():
606+
tasks = list_tasks(filters=[("name", "=", "f1")])
607+
assert len(tasks) == 1
608+
assert tasks[0]["state"] == "PENDING_NODE_ASSIGNMENT"
609+
return True
610+
611+
# f1 should be in the local lease manager of worker1,
612+
# waiting for resource to be available.
613+
wait_for_condition(verify_f1_pending_node_assignment)
614+
615+
is_accepted, _ = gcs_client.drain_node(
616+
worker1.node_id,
617+
autoscaler_pb2.DrainNodeReason.Value("DRAIN_NODE_REASON_PREEMPTION"),
618+
"preemption",
619+
2**63 - 1,
620+
)
621+
assert is_accepted
622+
623+
# The task should be rescheduled on another node.
624+
worker2 = cluster.add_node(num_cpus=1)
625+
assert ray.get(obj_ref) == worker2.node_id
626+
627+
574628
if __name__ == "__main__":
575629

576630
sys.exit(pytest.main(["-sv", __file__]))

src/mock/ray/raylet/local_lease_manager.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ class MockLocalLeaseManager : public LocalLeaseManagerInterface {
3131
rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type,
3232
const std::string &scheduling_failure_message),
3333
(override));
34+
MOCK_METHOD(std::vector<std::shared_ptr<internal::Work>>,
35+
CancelLeasesWithoutReply,
36+
(std::function<bool(const std::shared_ptr<internal::Work> &)> predicate),
37+
(override));
3438
MOCK_METHOD((const absl::flat_hash_map<SchedulingClass,
3539
std::deque<std::shared_ptr<internal::Work>>> &),
3640
GetLeasesToGrant,

src/ray/raylet/local_lease_manager.cc

Lines changed: 53 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,18 @@
3232
namespace ray {
3333
namespace raylet {
3434

35+
namespace {
36+
void ReplyCancelled(const std::shared_ptr<internal::Work> &work,
37+
rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type,
38+
const std::string &scheduling_failure_message) {
39+
auto reply = work->reply_;
40+
reply->set_canceled(true);
41+
reply->set_failure_type(failure_type);
42+
reply->set_scheduling_failure_message(scheduling_failure_message);
43+
work->send_reply_callback_(Status::OK(), nullptr, nullptr);
44+
}
45+
} // namespace
46+
3547
LocalLeaseManager::LocalLeaseManager(
3648
const NodeID &self_node_id,
3749
ClusterResourceScheduler &cluster_resource_scheduler,
@@ -410,10 +422,10 @@ void LocalLeaseManager::GrantScheduledLeasesToWorkers() {
410422
<< front_lease.DebugString();
411423
auto leases_to_grant_queue_iter = leases_to_grant_queue.begin();
412424
while (leases_to_grant_queue_iter != leases_to_grant_queue.end()) {
413-
CancelLeaseToGrant(
414-
*leases_to_grant_queue_iter,
415-
rpc::RequestWorkerLeaseReply::SCHEDULING_CANCELLED_UNSCHEDULABLE,
416-
"Lease granting failed due to the lease becoming infeasible.");
425+
CancelLeaseToGrantWithoutReply(*leases_to_grant_queue_iter);
426+
ReplyCancelled(*leases_to_grant_queue_iter,
427+
rpc::RequestWorkerLeaseReply::SCHEDULING_CANCELLED_UNSCHEDULABLE,
428+
"Lease granting failed due to the lease becoming infeasible.");
417429
leases_to_grant_queue_iter =
418430
leases_to_grant_queue.erase(leases_to_grant_queue_iter);
419431
}
@@ -536,7 +548,7 @@ bool LocalLeaseManager::PoppedWorkerHandler(
536548
const rpc::Address &owner_address,
537549
const std::string &runtime_env_setup_error_message) {
538550
const auto &reply = work->reply_;
539-
const auto &callback = work->callback_;
551+
const auto &send_reply_callback = work->send_reply_callback_;
540552
const bool canceled = work->GetState() == internal::WorkStatus::CANCELLED;
541553
const auto &lease = work->lease_;
542554
bool granted = false;
@@ -650,7 +662,12 @@ bool LocalLeaseManager::PoppedWorkerHandler(
650662
RAY_LOG(DEBUG) << "Granting lease " << lease_id << " to worker "
651663
<< worker->WorkerId();
652664

653-
Grant(worker, leased_workers_, work->allocated_instances_, lease, reply, callback);
665+
Grant(worker,
666+
leased_workers_,
667+
work->allocated_instances_,
668+
lease,
669+
reply,
670+
send_reply_callback);
654671
erase_from_leases_to_grant_queue_fn(work, scheduling_class);
655672
granted = true;
656673
}
@@ -660,11 +677,11 @@ bool LocalLeaseManager::PoppedWorkerHandler(
660677

661678
void LocalLeaseManager::Spillback(const NodeID &spillback_to,
662679
const std::shared_ptr<internal::Work> &work) {
663-
auto send_reply_callback = work->callback_;
680+
auto send_reply_callback = work->send_reply_callback_;
664681

665682
if (work->grant_or_reject_) {
666683
work->reply_->set_rejected(true);
667-
send_reply_callback();
684+
send_reply_callback(Status::OK(), nullptr, nullptr);
668685
return;
669686
}
670687

@@ -690,7 +707,7 @@ void LocalLeaseManager::Spillback(const NodeID &spillback_to,
690707
reply->mutable_retry_at_raylet_address()->set_port(node_info_ptr->node_manager_port());
691708
reply->mutable_retry_at_raylet_address()->set_node_id(spillback_to.Binary());
692709

693-
send_reply_callback();
710+
send_reply_callback(Status::OK(), nullptr, nullptr);
694711
}
695712

696713
void LocalLeaseManager::LeasesUnblocked(const std::vector<LeaseID> &ready_ids) {
@@ -843,61 +860,52 @@ void LocalLeaseManager::ReleaseLeaseArgs(const LeaseID &lease_id) {
843860
}
844861
}
845862

846-
namespace {
847-
void ReplyCancelled(const std::shared_ptr<internal::Work> &work,
848-
rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type,
849-
const std::string &scheduling_failure_message) {
850-
auto reply = work->reply_;
851-
auto callback = work->callback_;
852-
reply->set_canceled(true);
853-
reply->set_failure_type(failure_type);
854-
reply->set_scheduling_failure_message(scheduling_failure_message);
855-
callback();
856-
}
857-
} // namespace
858-
859-
bool LocalLeaseManager::CancelLeases(
860-
std::function<bool(const std::shared_ptr<internal::Work> &)> predicate,
861-
rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type,
862-
const std::string &scheduling_failure_message) {
863-
bool tasks_cancelled = false;
863+
std::vector<std::shared_ptr<internal::Work>> LocalLeaseManager::CancelLeasesWithoutReply(
864+
std::function<bool(const std::shared_ptr<internal::Work> &)> predicate) {
865+
std::vector<std::shared_ptr<internal::Work>> cancelled_works;
864866

865867
ray::erase_if<SchedulingClass, std::shared_ptr<internal::Work>>(
866868
leases_to_grant_, [&](const std::shared_ptr<internal::Work> &work) {
867869
if (!predicate(work)) {
868870
return false;
869871
}
870-
CancelLeaseToGrant(work, failure_type, scheduling_failure_message);
871-
tasks_cancelled = true;
872+
CancelLeaseToGrantWithoutReply(work);
873+
cancelled_works.push_back(work);
872874
return true;
873875
});
874876

875877
ray::erase_if<std::shared_ptr<internal::Work>>(
876878
waiting_lease_queue_, [&](const std::shared_ptr<internal::Work> &work) {
877-
if (predicate(work)) {
878-
ReplyCancelled(work, failure_type, scheduling_failure_message);
879-
if (!work->lease_.GetLeaseSpecification().GetDependencies().empty()) {
880-
lease_dependency_manager_.RemoveLeaseDependencies(
881-
work->lease_.GetLeaseSpecification().LeaseId());
882-
}
883-
waiting_leases_index_.erase(work->lease_.GetLeaseSpecification().LeaseId());
884-
tasks_cancelled = true;
885-
return true;
886-
} else {
879+
if (!predicate(work)) {
887880
return false;
888881
}
882+
if (!work->lease_.GetLeaseSpecification().GetDependencies().empty()) {
883+
lease_dependency_manager_.RemoveLeaseDependencies(
884+
work->lease_.GetLeaseSpecification().LeaseId());
885+
}
886+
waiting_leases_index_.erase(work->lease_.GetLeaseSpecification().LeaseId());
887+
cancelled_works.push_back(work);
888+
return true;
889889
});
890890

891-
return tasks_cancelled;
891+
return cancelled_works;
892892
}
893893

894-
void LocalLeaseManager::CancelLeaseToGrant(
895-
const std::shared_ptr<internal::Work> &work,
894+
bool LocalLeaseManager::CancelLeases(
895+
std::function<bool(const std::shared_ptr<internal::Work> &)> predicate,
896896
rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type,
897897
const std::string &scheduling_failure_message) {
898+
auto cancelled_works = CancelLeasesWithoutReply(predicate);
899+
for (const auto &work : cancelled_works) {
900+
ReplyCancelled(work, failure_type, scheduling_failure_message);
901+
}
902+
return !cancelled_works.empty();
903+
}
904+
905+
void LocalLeaseManager::CancelLeaseToGrantWithoutReply(
906+
const std::shared_ptr<internal::Work> &work) {
898907
const LeaseID lease_id = work->lease_.GetLeaseSpecification().LeaseId();
899908
RAY_LOG(DEBUG) << "Canceling lease " << lease_id << " from leases_to_grant_queue.";
900-
ReplyCancelled(work, failure_type, scheduling_failure_message);
901909
if (work->GetState() == internal::WorkStatus::WAITING_FOR_WORKER) {
902910
// We've already acquired resources so we need to release them.
903911
cluster_resource_scheduler_.GetLocalResourceManager().ReleaseWorkerResources(
@@ -962,7 +970,7 @@ void LocalLeaseManager::Grant(
962970
const std::shared_ptr<TaskResourceInstances> &allocated_instances,
963971
const RayLease &lease,
964972
rpc::RequestWorkerLeaseReply *reply,
965-
std::function<void(void)> send_reply_callback) {
973+
rpc::SendReplyCallback send_reply_callback) {
966974
const auto &lease_spec = lease.GetLeaseSpecification();
967975

968976
if (lease_spec.IsActorCreationTask()) {
@@ -1011,7 +1019,7 @@ void LocalLeaseManager::Grant(
10111019
}
10121020
}
10131021
// Send the result back.
1014-
send_reply_callback();
1022+
send_reply_callback(Status::OK(), nullptr, nullptr);
10151023
}
10161024

10171025
void LocalLeaseManager::ClearWorkerBacklog(const WorkerID &worker_id) {

src/ray/raylet/local_lease_manager.h

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,9 @@ class LocalLeaseManager : public LocalLeaseManagerInterface {
122122
rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type,
123123
const std::string &scheduling_failure_message) override;
124124

125+
std::vector<std::shared_ptr<internal::Work>> CancelLeasesWithoutReply(
126+
std::function<bool(const std::shared_ptr<internal::Work> &)> predicate) override;
127+
125128
/// Return with an exemplar if any leases are pending resource acquisition.
126129
///
127130
/// \param[in,out] num_pending_actor_creation: Number of pending actor creation leases.
@@ -202,11 +205,7 @@ class LocalLeaseManager : public LocalLeaseManagerInterface {
202205
const std::string &runtime_env_setup_error_message);
203206

204207
/// Cancels a lease in leases_to_grant_. Does not remove it from leases_to_grant_.
205-
void CancelLeaseToGrant(
206-
const std::shared_ptr<internal::Work> &work,
207-
rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type =
208-
rpc::RequestWorkerLeaseReply::SCHEDULING_CANCELLED_INTENDED,
209-
const std::string &scheduling_failure_message = "");
208+
void CancelLeaseToGrantWithoutReply(const std::shared_ptr<internal::Work> &work);
210209

211210
/// Attempts to grant all leases which are ready to run. A lease
212211
/// will be granted if it is on `leases_to_grant_` and there are still
@@ -250,7 +249,7 @@ class LocalLeaseManager : public LocalLeaseManagerInterface {
250249
const std::shared_ptr<TaskResourceInstances> &allocated_instances,
251250
const RayLease &lease,
252251
rpc::RequestWorkerLeaseReply *reply,
253-
std::function<void(void)> send_reply_callback);
252+
rpc::SendReplyCallback send_reply_callback);
254253

255254
void Spillback(const NodeID &spillback_to, const std::shared_ptr<internal::Work> &work);
256255

src/ray/raylet/node_manager.cc

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2083,7 +2083,23 @@ void NodeManager::HandleDrainRaylet(rpc::DrainRayletRequest request,
20832083
reply->set_is_accepted(true);
20842084
}
20852085

2086+
const bool is_drain_accepted = reply->is_accepted();
20862087
send_reply_callback(Status::OK(), nullptr, nullptr);
2088+
2089+
if (is_drain_accepted) {
2090+
// Fail fast on the leases in the local lease manager
2091+
// and add them back to the cluster lease manager so a new node
2092+
// can be selected by the scheduler.
2093+
auto cancelled_works = local_lease_manager_.CancelLeasesWithoutReply(
2094+
[&](const std::shared_ptr<internal::Work> &work) { return true; });
2095+
for (const auto &work : cancelled_works) {
2096+
cluster_lease_manager_.QueueAndScheduleLease(work->lease_,
2097+
work->grant_or_reject_,
2098+
work->is_selected_based_on_locality_,
2099+
work->reply_,
2100+
work->send_reply_callback_);
2101+
}
2102+
}
20872103
}
20882104

20892105
void NodeManager::HandleShutdownRaylet(rpc::ShutdownRayletRequest request,

src/ray/raylet/node_manager.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,11 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
302302
rpc::CancelWorkerLeaseReply *reply,
303303
rpc::SendReplyCallback send_reply_callback) override;
304304

305+
/// Handle a `DrainRaylet` request.
306+
void HandleDrainRaylet(rpc::DrainRayletRequest request,
307+
rpc::DrainRayletReply *reply,
308+
rpc::SendReplyCallback send_reply_callback) override;
309+
305310
private:
306311
FRIEND_TEST(NodeManagerStaticTest, TestHandleReportWorkerBacklog);
307312

@@ -589,11 +594,6 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
589594
rpc::ShutdownRayletReply *reply,
590595
rpc::SendReplyCallback send_reply_callback) override;
591596

592-
/// Handle a `DrainRaylet` request.
593-
void HandleDrainRaylet(rpc::DrainRayletRequest request,
594-
rpc::DrainRayletReply *reply,
595-
rpc::SendReplyCallback send_reply_callback) override;
596-
597597
void HandleIsLocalWorkerDead(rpc::IsLocalWorkerDeadRequest request,
598598
rpc::IsLocalWorkerDeadReply *reply,
599599
rpc::SendReplyCallback send_reply_callback) override;

src/ray/raylet/scheduling/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ ray_cc_library(
3636
"//src/ray/common:lease",
3737
"//src/ray/common/scheduling:cluster_resource_data",
3838
"//src/ray/protobuf:node_manager_cc_proto",
39+
"//src/ray/rpc:rpc_callback_types",
3940
],
4041
)
4142

0 commit comments

Comments
 (0)