@@ -36,11 +36,13 @@ namespace {
3636void ReplyCancelled (const std::shared_ptr<internal::Work> &work,
3737 rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type,
3838 const std::string &scheduling_failure_message) {
39- auto reply = work->reply_ ;
40- reply->set_canceled (true );
41- reply->set_failure_type (failure_type);
42- reply->set_scheduling_failure_message (scheduling_failure_message);
43- work->send_reply_callback_ (Status::OK (), nullptr , nullptr );
39+ for (const auto &reply_callback : work->reply_callbacks_ ) {
40+ auto reply = reply_callback.reply_ ;
41+ reply->set_canceled (true );
42+ reply->set_failure_type (failure_type);
43+ reply->set_scheduling_failure_message (scheduling_failure_message);
44+ reply_callback.send_reply_callback_ (Status::OK (), nullptr , nullptr );
45+ }
4446}
4547} // namespace
4648
@@ -547,8 +549,7 @@ bool LocalLeaseManager::PoppedWorkerHandler(
547549 bool is_detached_actor,
548550 const rpc::Address &owner_address,
549551 const std::string &runtime_env_setup_error_message) {
550- const auto &reply = work->reply_ ;
551- const auto &send_reply_callback = work->send_reply_callback_ ;
552+ const auto &reply_callbacks = work->reply_callbacks_ ;
552553 const bool canceled = work->GetState () == internal::WorkStatus::CANCELLED;
553554 const auto &lease = work->lease_ ;
554555 bool granted = false ;
@@ -662,12 +663,7 @@ bool LocalLeaseManager::PoppedWorkerHandler(
662663 RAY_LOG (DEBUG) << " Granting lease " << lease_id << " to worker "
663664 << worker->WorkerId ();
664665
665- Grant (worker,
666- leased_workers_,
667- work->allocated_instances_ ,
668- lease,
669- reply,
670- send_reply_callback);
666+ Grant (worker, leased_workers_, work->allocated_instances_ , lease, reply_callbacks);
671667 erase_from_leases_to_grant_queue_fn (work, scheduling_class);
672668 granted = true ;
673669 }
@@ -677,11 +673,11 @@ bool LocalLeaseManager::PoppedWorkerHandler(
677673
678674void LocalLeaseManager::Spillback (const NodeID &spillback_to,
679675 const std::shared_ptr<internal::Work> &work) {
680- auto send_reply_callback = work->send_reply_callback_ ;
681-
682676 if (work->grant_or_reject_ ) {
683- work->reply_ ->set_rejected (true );
684- send_reply_callback (Status::OK (), nullptr , nullptr );
677+ for (const auto &reply_callback : work->reply_callbacks_ ) {
678+ reply_callback.reply_ ->set_rejected (true );
679+ reply_callback.send_reply_callback_ (Status::OK (), nullptr , nullptr );
680+ }
685681 return ;
686682 }
687683
@@ -701,13 +697,15 @@ void LocalLeaseManager::Spillback(const NodeID &spillback_to,
701697 RAY_CHECK (node_info_ptr)
702698 << " Spilling back to a node manager, but no GCS info found for node "
703699 << spillback_to;
704- auto reply = work->reply_ ;
705- reply->mutable_retry_at_raylet_address ()->set_ip_address (
706- node_info_ptr->node_manager_address ());
707- reply->mutable_retry_at_raylet_address ()->set_port (node_info_ptr->node_manager_port ());
708- reply->mutable_retry_at_raylet_address ()->set_node_id (spillback_to.Binary ());
709-
710- send_reply_callback (Status::OK (), nullptr , nullptr );
700+ for (const auto &reply_callback : work->reply_callbacks_ ) {
701+ auto reply = reply_callback.reply_ ;
702+ reply->mutable_retry_at_raylet_address ()->set_ip_address (
703+ node_info_ptr->node_manager_address ());
704+ reply->mutable_retry_at_raylet_address ()->set_port (
705+ node_info_ptr->node_manager_port ());
706+ reply->mutable_retry_at_raylet_address ()->set_node_id (spillback_to.Binary ());
707+ reply_callback.send_reply_callback_ (Status::OK (), nullptr , nullptr );
708+ }
711709}
712710
713711void LocalLeaseManager::LeasesUnblocked (const std::vector<LeaseID> &ready_ids) {
@@ -969,8 +967,7 @@ void LocalLeaseManager::Grant(
969967 absl::flat_hash_map<LeaseID, std::shared_ptr<WorkerInterface>> &leased_workers,
970968 const std::shared_ptr<TaskResourceInstances> &allocated_instances,
971969 const RayLease &lease,
972- rpc::RequestWorkerLeaseReply *reply,
973- rpc::SendReplyCallback send_reply_callback) {
970+ const std::vector<internal::ReplyCallback> &reply_callbacks) {
974971 const auto &lease_spec = lease.GetLeaseSpecification ();
975972
976973 if (lease_spec.IsActorCreationTask ()) {
@@ -982,11 +979,14 @@ void LocalLeaseManager::Grant(
982979 worker->GrantLease (lease);
983980
984981 // Pass the contact info of the worker to use.
985- reply->set_worker_pid (worker->GetProcess ().GetId ());
986- reply->mutable_worker_address ()->set_ip_address (worker->IpAddress ());
987- reply->mutable_worker_address ()->set_port (worker->Port ());
988- reply->mutable_worker_address ()->set_worker_id (worker->WorkerId ().Binary ());
989- reply->mutable_worker_address ()->set_node_id (self_node_id_.Binary ());
982+ for (const auto &reply_callback : reply_callbacks) {
983+ reply_callback.reply_ ->set_worker_pid (worker->GetProcess ().GetId ());
984+ reply_callback.reply_ ->mutable_worker_address ()->set_ip_address (worker->IpAddress ());
985+ reply_callback.reply_ ->mutable_worker_address ()->set_port (worker->Port ());
986+ reply_callback.reply_ ->mutable_worker_address ()->set_worker_id (
987+ worker->WorkerId ().Binary ());
988+ reply_callback.reply_ ->mutable_worker_address ()->set_node_id (self_node_id_.Binary ());
989+ }
990990
991991 RAY_CHECK (!leased_workers.contains (lease_spec.LeaseId ()));
992992 leased_workers[lease_spec.LeaseId ()] = worker;
@@ -1000,26 +1000,29 @@ void LocalLeaseManager::Grant(
10001000 } else {
10011001 allocated_resources = worker->GetAllocatedInstances ();
10021002 }
1003- ::ray::rpc::ResourceMapEntry *resource;
10041003 for (auto &resource_id : allocated_resources->ResourceIds ()) {
1005- bool first = true ; // Set resource name only if at least one of its
1006- // instances has available capacity.
10071004 auto instances = allocated_resources->Get (resource_id);
1008- for (size_t inst_idx = 0 ; inst_idx < instances.size (); inst_idx++) {
1009- if (instances[inst_idx] > 0 .) {
1010- if (first) {
1011- resource = reply->add_resource_mapping ();
1012- resource->set_name (resource_id.Binary ());
1013- first = false ;
1005+ for (const auto &reply_callback : reply_callbacks) {
1006+ ::ray::rpc::ResourceMapEntry *resource = nullptr ;
1007+ for (size_t inst_idx = 0 ; inst_idx < instances.size (); inst_idx++) {
1008+ if (instances[inst_idx] > 0 .) {
1009+ // Set resource name only if at least one of its instances has available
1010+ // capacity.
1011+ if (resource == nullptr ) {
1012+ resource = reply_callback.reply_ ->add_resource_mapping ();
1013+ resource->set_name (resource_id.Binary ());
1014+ }
1015+ auto rid = resource->add_resource_ids ();
1016+ rid->set_index (inst_idx);
1017+ rid->set_quantity (instances[inst_idx].Double ());
10141018 }
1015- auto rid = resource->add_resource_ids ();
1016- rid->set_index (inst_idx);
1017- rid->set_quantity (instances[inst_idx].Double ());
10181019 }
10191020 }
10201021 }
1021- // Send the result back.
1022- send_reply_callback (Status::OK (), nullptr , nullptr );
1022+ // Send the result back to the clients.
1023+ for (const auto &reply_callback : reply_callbacks) {
1024+ reply_callback.send_reply_callback_ (Status::OK (), nullptr , nullptr );
1025+ }
10231026}
10241027
10251028void LocalLeaseManager::ClearWorkerBacklog (const WorkerID &worker_id) {
@@ -1258,5 +1261,41 @@ void LocalLeaseManager::DebugStr(std::stringstream &buffer) const {
12581261 }
12591262}
12601263
1264+ bool LocalLeaseManager::IsLeaseQueued (const SchedulingClass &scheduling_class,
1265+ const LeaseID &lease_id) const {
1266+ if (waiting_leases_index_.contains (lease_id)) {
1267+ return true ;
1268+ }
1269+ auto leases_to_grant_it = leases_to_grant_.find (scheduling_class);
1270+ if (leases_to_grant_it != leases_to_grant_.end ()) {
1271+ for (const auto &work : leases_to_grant_it->second ) {
1272+ if (work->lease_ .GetLeaseSpecification ().LeaseId () == lease_id) {
1273+ return true ;
1274+ }
1275+ }
1276+ }
1277+ return false ;
1278+ }
1279+
1280+ bool LocalLeaseManager::AddReplyCallback (const SchedulingClass &scheduling_class,
1281+ const LeaseID &lease_id,
1282+ rpc::SendReplyCallback send_reply_callback,
1283+ rpc::RequestWorkerLeaseReply *reply) {
1284+ if (leases_to_grant_.contains (scheduling_class)) {
1285+ for (const auto &work : leases_to_grant_[scheduling_class]) {
1286+ if (work->lease_ .GetLeaseSpecification ().LeaseId () == lease_id) {
1287+ work->reply_callbacks_ .emplace_back (std::move (send_reply_callback), reply);
1288+ return true ;
1289+ }
1290+ }
1291+ }
1292+ auto it = waiting_leases_index_.find (lease_id);
1293+ if (it != waiting_leases_index_.end ()) {
1294+ (*it->second )->reply_callbacks_ .emplace_back (std::move (send_reply_callback), reply);
1295+ return true ;
1296+ }
1297+ return false ;
1298+ }
1299+
12611300} // namespace raylet
12621301} // namespace ray
0 commit comments