diff --git a/src/ray/ray_syncer/ray_syncer_bidi_reactor.h b/src/ray/ray_syncer/ray_syncer_bidi_reactor.h index 718382ea9b96..44464291ad08 100644 --- a/src/ray/ray_syncer/ray_syncer_bidi_reactor.h +++ b/src/ray/ray_syncer/ray_syncer_bidi_reactor.h @@ -56,14 +56,14 @@ using ray::rpc::syncer::ResourceViewSyncMessage; /// /// /// For the client side: -/// +------------+ +-------------+ +------------+ gRPC error or disconnected +--------+ -/// | StartCall | ---> | StartRead | <---> | OnReadDone | ----------------------------> | OnDone | -/// +------------+ +-------------+ +------------+ +--------+ -/// | ^ -/// | | -/// v | -/// +------------+ +-------------+ gRPC error or disconnected | -/// | StartWrite | <--> | OnWriteDone | -------------------------------------------------------+ +/// +------------+ +-------------+ +------------+ gRPC error or ALL incoming data read +--------+ +/// | StartCall | ---> | StartRead | <---> | OnReadDone | --------------------------------------> | OnDone | +/// +------------+ +-------------+ +------------+ +--------+ +/// | ^ +/// | | +/// v | +/// +------------+ +-------------+ gRPC error or disconnected | +/// | StartWrite | <--> | OnWriteDone | ------------------------------------------------------------------+ /// +------------+ +-------------+ // clang-format on class RaySyncerBidiReactor { diff --git a/src/ray/ray_syncer/ray_syncer_bidi_reactor_base.h b/src/ray/ray_syncer/ray_syncer_bidi_reactor_base.h index 2de621c47a5b..784fdc704c9a 100644 --- a/src/ray/ray_syncer/ray_syncer_bidi_reactor_base.h +++ b/src/ray/ray_syncer/ray_syncer_bidi_reactor_base.h @@ -171,14 +171,13 @@ class RaySyncerBidiReactorBase : public RaySyncerBidiReactor, public T { void OnReadDone(bool ok) override { io_context_.dispatch( - [this, - ok, - disconnected = IsDisconnected(), - msg = std::move(receiving_message_)]() mutable { - if (*disconnected) { - return; - } - + [this, ok, msg = std::move(receiving_message_)]() mutable { + // NOTE: According to the grpc callback streaming api best practices 3.) + // https://grpc.io/docs/languages/cpp/best_practices/#callback-streaming-api + // The client must read all incoming data i.e. until OnReadDone(ok = false) + // happens for OnDone to be called. Hence even if disconnected_ is true, we + // still need to allow OnReadDone to repeatedly execute until StartReadData has + // consumed all the data for OnDone to be called. if (!ok) { RAY_LOG_EVERY_MS(INFO, 1000) << "Failed to read a message from node: " << NodeID::FromBinary(GetRemoteNodeID());