Skip to content

Commit 168cdc6

Browse files
authored
[core] Fix idempotency issues in RequestWorkerLease for scheduled leases (#58265)
## Description > Briefly describe what this PR accomplishes and why it's needed. Using the ip tables script created in #58241 we found a bug in RequestWorkerLease where a RAY_CHECK was being triggered here: https://github.com/ray-project/ray/blob/66c08b47a195bcfac6878a234dc804142e488fc2/src/ray/raylet/lease_dependency_manager.cc#L222-L223 The issue is that transient network errors can happen ANYTIME, including when the server logic is executing and has not yet replied back to the client. Our original testing framework using an env variable to drop the request or reply when it's being sent, hence this was missed. The issue specifically is that RequestWorkerLease could be in the process of pulling the lease dependencies to it's local plasma store, and the retry can arrive triggering this check. Created a cpp unit test that specifically triggers this RAY_CHECK without this change and is fixed. I decided to store the callbacks instead of replacing the older one with the new one due to the possibility of message reordering where the new one could arrive before the old one. --------- Signed-off-by: joshlee <[email protected]>
1 parent 62d23ff commit 168cdc6

13 files changed

+830
-210
lines changed

src/mock/ray/raylet/local_lease_manager.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,17 @@ class MockLocalLeaseManager : public LocalLeaseManagerInterface {
7979
MOCK_METHOD(size_t, GetNumLeaseSpilled, (), (const, override));
8080
MOCK_METHOD(size_t, GetNumWaitingLeaseSpilled, (), (const, override));
8181
MOCK_METHOD(size_t, GetNumUnschedulableLeaseSpilled, (), (const, override));
82+
MOCK_METHOD(bool,
83+
IsLeaseQueued,
84+
(const SchedulingClass &scheduling_class, const LeaseID &lease_id),
85+
(const, override));
86+
MOCK_METHOD(bool,
87+
AddReplyCallback,
88+
(const SchedulingClass &scheduling_class,
89+
const LeaseID &lease_id,
90+
rpc::SendReplyCallback send_reply_callback,
91+
rpc::RequestWorkerLeaseReply *reply),
92+
(override));
8293
};
8394

8495
} // namespace ray::raylet

src/ray/gcs/gcs_actor_scheduler.cc

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -103,11 +103,12 @@ void GcsActorScheduler::ScheduleByGcs(std::shared_ptr<GcsActor> actor) {
103103
RayLease lease(
104104
actor->GetLeaseSpecification(),
105105
owner_node.has_value() ? actor->GetOwnerNodeID().Binary() : std::string());
106-
cluster_lease_manager_.QueueAndScheduleLease(std::move(lease),
107-
/*grant_or_reject=*/false,
108-
/*is_selected_based_on_locality=*/false,
109-
/*reply=*/reply.get(),
110-
send_reply_callback);
106+
cluster_lease_manager_.QueueAndScheduleLease(
107+
std::move(lease),
108+
/*grant_or_reject=*/false,
109+
/*is_selected_based_on_locality=*/false,
110+
{ray::raylet::internal::ReplyCallback(std::move(send_reply_callback),
111+
reply.get())});
111112
}
112113

113114
void GcsActorScheduler::ScheduleByRaylet(std::shared_ptr<GcsActor> actor) {

src/ray/raylet/local_lease_manager.cc

Lines changed: 84 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,13 @@ namespace {
3636
void ReplyCancelled(const std::shared_ptr<internal::Work> &work,
3737
rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type,
3838
const std::string &scheduling_failure_message) {
39-
auto reply = work->reply_;
40-
reply->set_canceled(true);
41-
reply->set_failure_type(failure_type);
42-
reply->set_scheduling_failure_message(scheduling_failure_message);
43-
work->send_reply_callback_(Status::OK(), nullptr, nullptr);
39+
for (const auto &reply_callback : work->reply_callbacks_) {
40+
auto reply = reply_callback.reply_;
41+
reply->set_canceled(true);
42+
reply->set_failure_type(failure_type);
43+
reply->set_scheduling_failure_message(scheduling_failure_message);
44+
reply_callback.send_reply_callback_(Status::OK(), nullptr, nullptr);
45+
}
4446
}
4547
} // namespace
4648

@@ -547,8 +549,7 @@ bool LocalLeaseManager::PoppedWorkerHandler(
547549
bool is_detached_actor,
548550
const rpc::Address &owner_address,
549551
const std::string &runtime_env_setup_error_message) {
550-
const auto &reply = work->reply_;
551-
const auto &send_reply_callback = work->send_reply_callback_;
552+
const auto &reply_callbacks = work->reply_callbacks_;
552553
const bool canceled = work->GetState() == internal::WorkStatus::CANCELLED;
553554
const auto &lease = work->lease_;
554555
bool granted = false;
@@ -662,12 +663,7 @@ bool LocalLeaseManager::PoppedWorkerHandler(
662663
RAY_LOG(DEBUG) << "Granting lease " << lease_id << " to worker "
663664
<< worker->WorkerId();
664665

665-
Grant(worker,
666-
leased_workers_,
667-
work->allocated_instances_,
668-
lease,
669-
reply,
670-
send_reply_callback);
666+
Grant(worker, leased_workers_, work->allocated_instances_, lease, reply_callbacks);
671667
erase_from_leases_to_grant_queue_fn(work, scheduling_class);
672668
granted = true;
673669
}
@@ -677,11 +673,11 @@ bool LocalLeaseManager::PoppedWorkerHandler(
677673

678674
void LocalLeaseManager::Spillback(const NodeID &spillback_to,
679675
const std::shared_ptr<internal::Work> &work) {
680-
auto send_reply_callback = work->send_reply_callback_;
681-
682676
if (work->grant_or_reject_) {
683-
work->reply_->set_rejected(true);
684-
send_reply_callback(Status::OK(), nullptr, nullptr);
677+
for (const auto &reply_callback : work->reply_callbacks_) {
678+
reply_callback.reply_->set_rejected(true);
679+
reply_callback.send_reply_callback_(Status::OK(), nullptr, nullptr);
680+
}
685681
return;
686682
}
687683

@@ -701,13 +697,15 @@ void LocalLeaseManager::Spillback(const NodeID &spillback_to,
701697
RAY_CHECK(node_info_ptr)
702698
<< "Spilling back to a node manager, but no GCS info found for node "
703699
<< spillback_to;
704-
auto reply = work->reply_;
705-
reply->mutable_retry_at_raylet_address()->set_ip_address(
706-
node_info_ptr->node_manager_address());
707-
reply->mutable_retry_at_raylet_address()->set_port(node_info_ptr->node_manager_port());
708-
reply->mutable_retry_at_raylet_address()->set_node_id(spillback_to.Binary());
709-
710-
send_reply_callback(Status::OK(), nullptr, nullptr);
700+
for (const auto &reply_callback : work->reply_callbacks_) {
701+
auto reply = reply_callback.reply_;
702+
reply->mutable_retry_at_raylet_address()->set_ip_address(
703+
node_info_ptr->node_manager_address());
704+
reply->mutable_retry_at_raylet_address()->set_port(
705+
node_info_ptr->node_manager_port());
706+
reply->mutable_retry_at_raylet_address()->set_node_id(spillback_to.Binary());
707+
reply_callback.send_reply_callback_(Status::OK(), nullptr, nullptr);
708+
}
711709
}
712710

713711
void LocalLeaseManager::LeasesUnblocked(const std::vector<LeaseID> &ready_ids) {
@@ -969,8 +967,7 @@ void LocalLeaseManager::Grant(
969967
absl::flat_hash_map<LeaseID, std::shared_ptr<WorkerInterface>> &leased_workers,
970968
const std::shared_ptr<TaskResourceInstances> &allocated_instances,
971969
const RayLease &lease,
972-
rpc::RequestWorkerLeaseReply *reply,
973-
rpc::SendReplyCallback send_reply_callback) {
970+
const std::vector<internal::ReplyCallback> &reply_callbacks) {
974971
const auto &lease_spec = lease.GetLeaseSpecification();
975972

976973
if (lease_spec.IsActorCreationTask()) {
@@ -982,11 +979,14 @@ void LocalLeaseManager::Grant(
982979
worker->GrantLease(lease);
983980

984981
// Pass the contact info of the worker to use.
985-
reply->set_worker_pid(worker->GetProcess().GetId());
986-
reply->mutable_worker_address()->set_ip_address(worker->IpAddress());
987-
reply->mutable_worker_address()->set_port(worker->Port());
988-
reply->mutable_worker_address()->set_worker_id(worker->WorkerId().Binary());
989-
reply->mutable_worker_address()->set_node_id(self_node_id_.Binary());
982+
for (const auto &reply_callback : reply_callbacks) {
983+
reply_callback.reply_->set_worker_pid(worker->GetProcess().GetId());
984+
reply_callback.reply_->mutable_worker_address()->set_ip_address(worker->IpAddress());
985+
reply_callback.reply_->mutable_worker_address()->set_port(worker->Port());
986+
reply_callback.reply_->mutable_worker_address()->set_worker_id(
987+
worker->WorkerId().Binary());
988+
reply_callback.reply_->mutable_worker_address()->set_node_id(self_node_id_.Binary());
989+
}
990990

991991
RAY_CHECK(!leased_workers.contains(lease_spec.LeaseId()));
992992
leased_workers[lease_spec.LeaseId()] = worker;
@@ -1000,26 +1000,29 @@ void LocalLeaseManager::Grant(
10001000
} else {
10011001
allocated_resources = worker->GetAllocatedInstances();
10021002
}
1003-
::ray::rpc::ResourceMapEntry *resource;
10041003
for (auto &resource_id : allocated_resources->ResourceIds()) {
1005-
bool first = true; // Set resource name only if at least one of its
1006-
// instances has available capacity.
10071004
auto instances = allocated_resources->Get(resource_id);
1008-
for (size_t inst_idx = 0; inst_idx < instances.size(); inst_idx++) {
1009-
if (instances[inst_idx] > 0.) {
1010-
if (first) {
1011-
resource = reply->add_resource_mapping();
1012-
resource->set_name(resource_id.Binary());
1013-
first = false;
1005+
for (const auto &reply_callback : reply_callbacks) {
1006+
::ray::rpc::ResourceMapEntry *resource = nullptr;
1007+
for (size_t inst_idx = 0; inst_idx < instances.size(); inst_idx++) {
1008+
if (instances[inst_idx] > 0.) {
1009+
// Set resource name only if at least one of its instances has available
1010+
// capacity.
1011+
if (resource == nullptr) {
1012+
resource = reply_callback.reply_->add_resource_mapping();
1013+
resource->set_name(resource_id.Binary());
1014+
}
1015+
auto rid = resource->add_resource_ids();
1016+
rid->set_index(inst_idx);
1017+
rid->set_quantity(instances[inst_idx].Double());
10141018
}
1015-
auto rid = resource->add_resource_ids();
1016-
rid->set_index(inst_idx);
1017-
rid->set_quantity(instances[inst_idx].Double());
10181019
}
10191020
}
10201021
}
1021-
// Send the result back.
1022-
send_reply_callback(Status::OK(), nullptr, nullptr);
1022+
// Send the result back to the clients.
1023+
for (const auto &reply_callback : reply_callbacks) {
1024+
reply_callback.send_reply_callback_(Status::OK(), nullptr, nullptr);
1025+
}
10231026
}
10241027

10251028
void LocalLeaseManager::ClearWorkerBacklog(const WorkerID &worker_id) {
@@ -1258,5 +1261,41 @@ void LocalLeaseManager::DebugStr(std::stringstream &buffer) const {
12581261
}
12591262
}
12601263

1264+
bool LocalLeaseManager::IsLeaseQueued(const SchedulingClass &scheduling_class,
1265+
const LeaseID &lease_id) const {
1266+
if (waiting_leases_index_.contains(lease_id)) {
1267+
return true;
1268+
}
1269+
auto leases_to_grant_it = leases_to_grant_.find(scheduling_class);
1270+
if (leases_to_grant_it != leases_to_grant_.end()) {
1271+
for (const auto &work : leases_to_grant_it->second) {
1272+
if (work->lease_.GetLeaseSpecification().LeaseId() == lease_id) {
1273+
return true;
1274+
}
1275+
}
1276+
}
1277+
return false;
1278+
}
1279+
1280+
bool LocalLeaseManager::AddReplyCallback(const SchedulingClass &scheduling_class,
1281+
const LeaseID &lease_id,
1282+
rpc::SendReplyCallback send_reply_callback,
1283+
rpc::RequestWorkerLeaseReply *reply) {
1284+
if (leases_to_grant_.contains(scheduling_class)) {
1285+
for (const auto &work : leases_to_grant_[scheduling_class]) {
1286+
if (work->lease_.GetLeaseSpecification().LeaseId() == lease_id) {
1287+
work->reply_callbacks_.emplace_back(std::move(send_reply_callback), reply);
1288+
return true;
1289+
}
1290+
}
1291+
}
1292+
auto it = waiting_leases_index_.find(lease_id);
1293+
if (it != waiting_leases_index_.end()) {
1294+
(*it->second)->reply_callbacks_.emplace_back(std::move(send_reply_callback), reply);
1295+
return true;
1296+
}
1297+
return false;
1298+
}
1299+
12611300
} // namespace raylet
12621301
} // namespace ray

src/ray/raylet/local_lease_manager.h

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,14 @@ class LocalLeaseManager : public LocalLeaseManagerInterface {
189189
return num_unschedulable_lease_spilled_;
190190
}
191191

192+
bool IsLeaseQueued(const SchedulingClass &scheduling_class,
193+
const LeaseID &lease_id) const override;
194+
195+
bool AddReplyCallback(const SchedulingClass &scheduling_class,
196+
const LeaseID &lease_id,
197+
rpc::SendReplyCallback send_reply_callback,
198+
rpc::RequestWorkerLeaseReply *reply) override;
199+
192200
private:
193201
struct SchedulingClassInfo;
194202

@@ -248,8 +256,7 @@ class LocalLeaseManager : public LocalLeaseManagerInterface {
248256
absl::flat_hash_map<LeaseID, std::shared_ptr<WorkerInterface>> &leased_workers_,
249257
const std::shared_ptr<TaskResourceInstances> &allocated_instances,
250258
const RayLease &lease,
251-
rpc::RequestWorkerLeaseReply *reply,
252-
rpc::SendReplyCallback send_reply_callback);
259+
const std::vector<internal::ReplyCallback> &reply_callbacks);
253260

254261
void Spillback(const NodeID &spillback_to, const std::shared_ptr<internal::Work> &work);
255262

src/ray/raylet/node_manager.cc

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1806,7 +1806,7 @@ void NodeManager::HandleRequestWorkerLease(rpc::RequestWorkerLeaseRequest reques
18061806
rpc::SendReplyCallback send_reply_callback) {
18071807
auto lease_id = LeaseID::FromBinary(request.lease_spec().lease_id());
18081808
// If the lease is already granted, this is a retry and forward the address of the
1809-
// already leased worker to use.
1809+
// already leased worker to use
18101810
if (leased_workers_.contains(lease_id)) {
18111811
const auto &worker = leased_workers_[lease_id];
18121812
RAY_LOG(DEBUG) << "Lease " << lease_id
@@ -1844,9 +1844,6 @@ void NodeManager::HandleRequestWorkerLease(rpc::RequestWorkerLeaseRequest reques
18441844
actor_id = lease.GetLeaseSpecification().ActorId();
18451845
}
18461846

1847-
const auto &lease_spec = lease.GetLeaseSpecification();
1848-
worker_pool_.PrestartWorkers(lease_spec, request.backlog_size());
1849-
18501847
auto send_reply_callback_wrapper =
18511848
[this, is_actor_creation_task, actor_id, reply, send_reply_callback](
18521849
Status status, std::function<void()> success, std::function<void()> failure) {
@@ -1877,11 +1874,34 @@ void NodeManager::HandleRequestWorkerLease(rpc::RequestWorkerLeaseRequest reques
18771874
send_reply_callback(status, std::move(success), std::move(failure));
18781875
};
18791876

1880-
cluster_lease_manager_.QueueAndScheduleLease(std::move(lease),
1881-
request.grant_or_reject(),
1882-
request.is_selected_based_on_locality(),
1883-
reply,
1884-
std::move(send_reply_callback_wrapper));
1877+
if (cluster_lease_manager_.IsLeaseQueued(
1878+
lease.GetLeaseSpecification().GetSchedulingClass(), lease_id)) {
1879+
RAY_CHECK(cluster_lease_manager_.AddReplyCallback(
1880+
lease.GetLeaseSpecification().GetSchedulingClass(),
1881+
lease_id,
1882+
std::move(send_reply_callback_wrapper),
1883+
reply));
1884+
return;
1885+
}
1886+
1887+
if (local_lease_manager_.IsLeaseQueued(
1888+
lease.GetLeaseSpecification().GetSchedulingClass(), lease_id)) {
1889+
RAY_CHECK(local_lease_manager_.AddReplyCallback(
1890+
lease.GetLeaseSpecification().GetSchedulingClass(),
1891+
lease_id,
1892+
std::move(send_reply_callback_wrapper),
1893+
reply));
1894+
return;
1895+
}
1896+
1897+
const auto &lease_spec = lease.GetLeaseSpecification();
1898+
worker_pool_.PrestartWorkers(lease_spec, request.backlog_size());
1899+
1900+
cluster_lease_manager_.QueueAndScheduleLease(
1901+
std::move(lease),
1902+
request.grant_or_reject(),
1903+
request.is_selected_based_on_locality(),
1904+
{internal::ReplyCallback(std::move(send_reply_callback_wrapper), reply)});
18851905
}
18861906

18871907
void NodeManager::HandlePrestartWorkers(rpc::PrestartWorkersRequest request,
@@ -2197,8 +2217,7 @@ void NodeManager::HandleDrainRaylet(rpc::DrainRayletRequest request,
21972217
cluster_lease_manager_.QueueAndScheduleLease(work->lease_,
21982218
work->grant_or_reject_,
21992219
work->is_selected_based_on_locality_,
2200-
work->reply_,
2201-
work->send_reply_callback_);
2220+
work->reply_callbacks_);
22022221
}
22032222
}
22042223
}

0 commit comments

Comments
 (0)