Skip to content

Commit

Permalink
[fleet_executor] Add retry to the message bus's send. Use unique_lock…
Browse files Browse the repository at this point in the history
… instead of calling lock(). (#37087)

* use unique lock, add retry

* bug fix
  • Loading branch information
FeixLiu authored Nov 10, 2021
1 parent b4e2543 commit f5caf9c
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 9 deletions.
3 changes: 1 addition & 2 deletions paddle/fluid/distributed/fleet_executor/interceptor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,8 @@ bool Interceptor::EnqueueRemoteInterceptorMessage(
// Called by Carrier, enqueue an InterceptorMessage to remote mailbox
VLOG(3) << "Enqueue message: " << interceptor_message.message_type()
<< " into " << interceptor_id_ << "'s remote mailbox.";
remote_mailbox_mutex_.lock();
std::unique_lock<std::mutex> lock(remote_mailbox_mutex_);
remote_mailbox_.push(interceptor_message);
remote_mailbox_mutex_.unlock();
return true;
}

Expand Down
25 changes: 18 additions & 7 deletions paddle/fluid/distributed/fleet_executor/message_bus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,25 @@ bool MessageBus::Send(const InterceptorMessage& interceptor_message) {
int64_t src_id = interceptor_message.src_id();
int64_t dst_id = interceptor_message.dst_id();
if (IsSameRank(src_id, dst_id)) {
VLOG(3) << "Send a message from: " << src_id << " to " << dst_id
<< " within a same rank.";
VLOG(3) << "Send a message from rank " << src_id << " to rank " << dst_id
<< ", which are same ranks.";
return SendIntraRank(interceptor_message);
} else {
VLOG(3) << "Send a message from: " << src_id << " to " << dst_id
<< " between different ranks.";
VLOG(3) << "Send a message from rank " << src_id << " to rank " << dst_id
<< ", which are different ranks.";
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \
!defined(PADDLE_WITH_ASCEND_CL)
return SendInterRank(interceptor_message);
int retry_time = 0; // message bus will retry sending for 10 times
while (retry_time < 10) {
++retry_time;
if (SendInterRank(interceptor_message)) {
VLOG(3) << "Message bus sends inter rank successfully with "
<< retry_time << " times retries.";
return true;
}
}
VLOG(3) << "Message bus sends inter rank fail after 10 times retries.";
return false;
#else
PADDLE_THROW(platform::errors::Unavailable(
"Fleet executor does not support sending message between different "
Expand Down Expand Up @@ -134,6 +144,7 @@ bool MessageBus::SendInterRank(const InterceptorMessage& interceptor_message) {
brpc::Channel channel;
brpc::ChannelOptions options;
options.protocol = "baidu_std";
options.connect_timeout_ms = 1000;
options.timeout_ms = 1000;
options.max_retry = 5;
PADDLE_ENFORCE_EQ(
Expand All @@ -149,11 +160,11 @@ bool MessageBus::SendInterRank(const InterceptorMessage& interceptor_message) {
VLOG(3) << "Message bus: brpc sends success.";
return true;
} else {
VLOG(3) << "Message bus: InterceptorMessageService error.";
VLOG(4) << "Message bus: InterceptorMessageService error.";
return false;
}
} else {
VLOG(3) << "Message bus: brpc sends failed with error text: "
VLOG(4) << "Message bus: brpc sends failed with error text: "
<< ctrl.ErrorText();
return false;
}
Expand Down

0 comments on commit f5caf9c

Please sign in to comment.