-
Notifications
You must be signed in to change notification settings - Fork 5.5k
gRPC streaming client. #1054
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
gRPC streaming client. #1054
Changes from 2 commits
8e0a11d
d715cae
482b3bb
2a608de
36b0c68
52c0985
5ac1896
d21b76b
211836a
a43d5c6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,113 @@ | ||
| #pragma once | ||
|
|
||
| #include <chrono> | ||
|
|
||
| #include "envoy/common/optional.h" | ||
| #include "envoy/common/pure.h" | ||
| #include "envoy/grpc/status.h" | ||
| #include "envoy/http/header_map.h" | ||
|
|
||
| #include "google/protobuf/descriptor.h" | ||
|
|
||
| namespace Envoy { | ||
| namespace Grpc { | ||
|
|
||
| /** | ||
| * An in-flight gRPC stream. | ||
| */ | ||
| template <class RequestType> class AsyncClientStream { | ||
| public: | ||
| virtual ~AsyncClientStream() {} | ||
|
|
||
| /** | ||
| * Send request message to the stream. | ||
| * @param request protobuf serializable message. | ||
| */ | ||
| virtual void sendMessage(const RequestType& request) PURE; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, an on done callback for send message is desired, else the client is not really async.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is not possible in the Envoy filter model, we don't know when the message was actually sent.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK, then is it a potentially blocking call? And when underlying buffer is full, this message will be blocked and we rely on this to get end to end flow control works? Need to be documented somewhere.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In Envoy, the send happens immediately from the client, down the filter chain. Then it sits in buffers and may be put on the wire at some arbitrary point in the future. I can add a comment to this effect. It's not blocking, nor do we have asynchronous send notification.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well, this is a surprise to me. As the e2e flow control gets broken. Good news is @htuch already have a plan to fix it. |
||
|
|
||
| /** | ||
| * Close the stream locally. No further methods may be invoked on the | ||
| * stream object, but callbacks may still be received until the stream is closed remotely. | ||
| */ | ||
| virtual void close() PURE; | ||
|
|
||
| /** | ||
| * Close the stream locally and remotely (as needed). No further methods may be invoked on the | ||
| * stream object and no further callbacks will be invoked. | ||
| */ | ||
| virtual void reset() PURE; | ||
| }; | ||
|
|
||
| /** | ||
| * Notifies caller of async gRPC stream status. | ||
| * Note the gRPC stream is full-duplex, even if the local to remote stream has been ended by | ||
| * AsyncClientStream.close(), AsyncClientCallbacks can continue to receive events until the remote | ||
| * to local stream is closed (onRemoteClose), and vice versa. Once the stream is closed remotely, no | ||
| * further callbacks will be invoked. | ||
| */ | ||
| template <class ResponseType> class AsyncClientCallbacks { | ||
| public: | ||
| virtual ~AsyncClientCallbacks() {} | ||
|
|
||
| /** | ||
| * Called when populating the headers to send with initial metadata. | ||
| * @param metadata initial metadata reference. | ||
| */ | ||
| virtual void onCreateInitialMetadata(Http::HeaderMap& metadata) PURE; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "onCreate" is confusing here. The method name should be able to represent when this method is going to be invoked.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is before the initial metadata is sent, and provides an opportunity to fill in the metadata. This is similar to how |
||
|
|
||
| /** | ||
| * Called when initial metadata is recevied. | ||
| * @param metadata initial metadata reference. | ||
| */ | ||
| virtual void onReceiveInitialMetadata(Http::HeaderMapPtr&& metadata) PURE; | ||
|
|
||
| /** | ||
| * Called when an async gRPC message is received. | ||
| * @param response the gRPC message. | ||
| */ | ||
| virtual void onReceiveMessage(std::unique_ptr<ResponseType>&& message) PURE; | ||
|
|
||
| /** | ||
| * Called when trailing metadata is recevied. | ||
| * @param metadata trailing metadata reference. | ||
| */ | ||
| virtual void onReceiveTrailingMetadata(Http::HeaderMapPtr&& metadata) PURE; | ||
|
|
||
| /** | ||
| * Called when the remote closes or an error occurs on the gRPC stream. The stream is | ||
| * considered remotely closed after this invocation and no further callbacks will be | ||
| * invoked. A non-Ok status implies that stream is also locally closed and that no | ||
| * further stream operations are permitted. | ||
| * @param status the gRPC status. | ||
| */ | ||
| virtual void onRemoteClose(Status::GrpcStatus status) PURE; | ||
| }; | ||
|
|
||
| /** | ||
| * Supports sending gRPC requests and receiving responses asynchronously. This can be used to | ||
| * implement either plain gRPC or streaming gRPC calls. | ||
| */ | ||
| template <class RequestType, class ResponseType> class AsyncClient { | ||
| public: | ||
| virtual ~AsyncClient() {} | ||
|
|
||
| /** | ||
| * Start a gRPC stream asynchronously. | ||
| * @param service_method protobuf descriptor of gRPC service method. | ||
| * @param callbacks the callbacks to be notified of stream status. | ||
| * @param timeout supplies the stream timeout, measured since when the frame with end_stream | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not sure off the top of my head how to fix this yet, but calling out that I'm not sure this definition of timeout makes sense for a streaming RPC. Not sure if we want to leave this here, or split interface somehow for streaming and unary?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was cribbing from the
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We had previous discussion around this. The intended behavior for
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK, I think it's fine for now, we can revisit later, we will only be using it with infinite timeout for the APIs. |
||
| * flag is sent until when the first frame is received. | ||
| * @return a stream handle or nullptr if no stream could be started. NOTE: In this case | ||
| * onRemoteClose() has already been called inline. The client owns the stream and | ||
| * the handle can be used to send more messages or finish the stream. It is expected that | ||
| * finish() is invoked by the caller to notify the client that the stream resources may | ||
| * be reclaimed. | ||
| */ | ||
| virtual AsyncClientStream<RequestType>* | ||
| start(const google::protobuf::MethodDescriptor& service_method, | ||
| AsyncClientCallbacks<ResponseType>& callbacks, | ||
| const Optional<std::chrono::milliseconds>& timeout) PURE; | ||
| }; | ||
|
|
||
| } // Grpc | ||
| } // Envoy | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,200 @@ | ||
| #pragma once | ||
|
|
||
| #include "envoy/grpc/async_client.h" | ||
|
|
||
| #include "common/common/enum_to_int.h" | ||
| #include "common/common/linked_object.h" | ||
| #include "common/grpc/codec.h" | ||
| #include "common/grpc/common.h" | ||
| #include "common/http/async_client_impl.h" | ||
| #include "common/http/header_map_impl.h" | ||
| #include "common/http/utility.h" | ||
|
|
||
| namespace Envoy { | ||
| namespace Grpc { | ||
|
|
||
| template <class RequestType, class ResponseType> class AsyncClientStreamImpl; | ||
|
|
||
| template <class RequestType, class ResponseType> | ||
| class AsyncClientImpl final : public AsyncClient<RequestType, ResponseType> { | ||
| public: | ||
| AsyncClientImpl(Upstream::ClusterManager& cm, const std::string& remote_cluster_name) | ||
| : cm_(cm), remote_cluster_name_(remote_cluster_name) {} | ||
|
|
||
| ~AsyncClientImpl() override { ASSERT(active_streams_.empty()); } | ||
|
|
||
| // Grpc::AsyncClient | ||
| AsyncClientStream<RequestType>* | ||
| start(const google::protobuf::MethodDescriptor& service_method, | ||
| AsyncClientCallbacks<ResponseType>& callbacks, | ||
| const Optional<std::chrono::milliseconds>& timeout) override { | ||
| std::unique_ptr<AsyncClientStreamImpl<RequestType, ResponseType>> grpc_stream{ | ||
| new AsyncClientStreamImpl<RequestType, ResponseType>(*this, callbacks)}; | ||
| Http::AsyncClient::Stream* http_stream = | ||
| cm_.httpAsyncClientForCluster(remote_cluster_name_) | ||
| .start(*grpc_stream, Optional<std::chrono::milliseconds>(timeout)); | ||
|
|
||
| if (http_stream == nullptr) { | ||
| callbacks.onRemoteClose(Status::GrpcStatus::Unavailable); | ||
| return nullptr; | ||
| } | ||
|
|
||
| grpc_stream->set_stream(http_stream); | ||
|
|
||
| Http::MessagePtr message = Common::prepareHeaders( | ||
| remote_cluster_name_, service_method.service()->full_name(), service_method.name()); | ||
| callbacks.onCreateInitialMetadata(message->headers()); | ||
|
|
||
| http_stream->sendHeaders(message->headers(), false); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Expose sendHeaders via grpc_stream. Sometimes, the initial metadata need to be prepared in an async routine.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I haven't looked at this code in a while, but fairly certain that sendHeaders() can result in an inline reset. Will the code work in that case or does the logic below need to be modified to potentially detect and return nullptr?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure if I see that in the current implementation (it can do a
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When you send headers, it's possible to get an immediate reset for example due to http/2 stream exhaustion or circuit breaking. I think that case, you want to return nullptr and raise a callback immediately? |
||
| grpc_stream->moveIntoList(std::move(grpc_stream), active_streams_); | ||
|
|
||
| return active_streams_.front().get(); | ||
| } | ||
|
|
||
| private: | ||
| Upstream::ClusterManager& cm_; | ||
| const std::string remote_cluster_name_; | ||
| std::list<std::unique_ptr<AsyncClientStreamImpl<RequestType, ResponseType>>> active_streams_; | ||
|
|
||
| friend class AsyncClientStreamImpl<RequestType, ResponseType>; | ||
| }; | ||
|
|
||
| template <class RequestType, class ResponseType> | ||
| class AsyncClientStreamImpl : public AsyncClientStream<RequestType>, | ||
| Http::AsyncClient::StreamCallbacks, | ||
| LinkedObject<AsyncClientStreamImpl<RequestType, ResponseType>> { | ||
| public: | ||
| AsyncClientStreamImpl(AsyncClientImpl<RequestType, ResponseType>& parent, | ||
| AsyncClientCallbacks<ResponseType>& callbacks) | ||
| : parent_(parent), callbacks_(callbacks) {} | ||
|
|
||
| // Http::AsyncClient::StreamCallbacks | ||
| void onHeaders(Http::HeaderMapPtr&& headers, bool end_stream) override { | ||
| ASSERT(!remote_closed_); | ||
| if (Http::Utility::getResponseStatus(*headers) != enumToInt(Http::Code::OK)) { | ||
| streamError(Status::GrpcStatus::Internal); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The HTTP code to gRPC status code mapping is defined here We should follow this for non-2xx HTTP codes. |
||
| return; | ||
| } | ||
| if (end_stream) { | ||
| onTrailers(std::move(headers)); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think we should fake trailers when headers are received actually.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. http://www.grpc.io/docs/guides/wire.html refers to this as a Trailers-Only response. |
||
| return; | ||
| } | ||
| callbacks_.onReceiveInitialMetadata(std::move(headers)); | ||
| } | ||
|
|
||
| void onData(Buffer::Instance& data, bool end_stream) override { | ||
| ASSERT(!remote_closed_); | ||
| std::vector<Frame> frames; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. as a slight perf improvement might consider making this a class variable. In the streaming case, if we clear the frames after dispatch, I think we can retain the capacity and then use it again later (I think). Not a huge deal but though I would point it out. |
||
| if (!decoder_.decode(data, frames)) { | ||
| streamError(Status::GrpcStatus::Internal); | ||
| return; | ||
| } | ||
|
|
||
| for (const auto& frame : frames) { | ||
| std::unique_ptr<ResponseType> response(new ResponseType()); | ||
| // TODO(htuch): We can avoid linearizing the buffer here when Buffer::Instance implements | ||
| // protobuf ZeroCopyInputStream. | ||
| // TODO(htuch): Need to add support for compressed responses as well here. | ||
| if (frame.flags_ != GRPC_FH_DEFAULT || | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The response can be compressed. So, it can be GRPC_FH_DEFAULT | compressed flag.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will do |
||
| !response->ParseFromArray(frame.data_->linearize(frame.data_->length()), | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add a TODO here for not do linearize. I will have a PR to convert Envoy |
||
| frame.data_->length())) { | ||
| streamError(Status::GrpcStatus::Internal); | ||
| return; | ||
| } | ||
| callbacks_.onReceiveMessage(std::move(response)); | ||
| } | ||
| if (end_stream) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If this is a protocol failure, should we check this before even trying to do the decode up above? |
||
| streamError(Status::GrpcStatus::Internal); | ||
| return; | ||
| } | ||
| } | ||
|
|
||
| void onTrailers(Http::HeaderMapPtr&& trailers) override { | ||
| ASSERT(!remote_closed_); | ||
| callbacks_.onReceiveTrailingMetadata(std::move(trailers)); | ||
|
|
||
| const Optional<Status::GrpcStatus> grpc_status = Common::getGrpcStatus(*trailers); | ||
| if (!grpc_status.valid()) { | ||
| streamError(Status::GrpcStatus::Internal); | ||
| return; | ||
| } | ||
| if (grpc_status.value() != Status::GrpcStatus::Ok) { | ||
| streamError(grpc_status.value()); | ||
| return; | ||
| } | ||
|
|
||
| callbacks_.onRemoteClose(Status::GrpcStatus::Ok); | ||
| closeRemote(); | ||
| } | ||
|
|
||
| void onReset() override { | ||
| if (http_reset_) { | ||
| return; | ||
| } | ||
|
|
||
| streamError(Status::GrpcStatus::Internal); | ||
| } | ||
|
|
||
| // Grpc::AsyncClientStream | ||
| void sendMessage(const RequestType& request) override { | ||
| stream_->sendData(*Common::serializeBody(request), false); | ||
| } | ||
|
|
||
| void close() override { closeLocal(); } | ||
|
|
||
| void reset() override { | ||
| closeLocal(); | ||
| closeRemote(); | ||
| } | ||
|
|
||
| void set_stream(Http::AsyncClient::Stream* stream) { stream_ = stream; } | ||
|
|
||
| private: | ||
| void streamError(Status::GrpcStatus grpc_status) { | ||
| callbacks_.onRemoteClose(grpc_status); | ||
| reset(); | ||
| } | ||
|
|
||
| void cleanup() { | ||
| if (!http_reset_) { | ||
| http_reset_ = true; | ||
| stream_->reset(); | ||
| } | ||
|
|
||
| // This will destroy us, but only do so if we are actually in a list. This does not happen in | ||
| // the immediate failure case. | ||
| if (LinkedObject<AsyncClientStreamImpl<RequestType, ResponseType>>::inserted()) { | ||
| LinkedObject<AsyncClientStreamImpl<RequestType, ResponseType>>::removeFromList( | ||
| parent_.active_streams_); | ||
| } | ||
| } | ||
|
|
||
| void closeLocal() { | ||
| local_closed_ |= true; | ||
| if (complete()) { | ||
| cleanup(); | ||
| } | ||
| } | ||
|
|
||
| void closeRemote() { | ||
| remote_closed_ |= true; | ||
| if (complete()) { | ||
| cleanup(); | ||
| } | ||
| } | ||
|
|
||
| bool complete() const { return local_closed_ && remote_closed_; } | ||
|
|
||
| AsyncClientImpl<RequestType, ResponseType>& parent_; | ||
| AsyncClientCallbacks<ResponseType>& callbacks_; | ||
| bool local_closed_{}; | ||
| bool remote_closed_{}; | ||
| bool http_reset_{}; | ||
| Http::AsyncClient::Stream* stream_{}; | ||
| Decoder decoder_; | ||
|
|
||
| friend class AsyncClientImpl<RequestType, ResponseType>; | ||
| }; | ||
|
|
||
| } // namespace Grpc | ||
| } // namespace Envoy | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
general comment. Can we open issue to track deprecating RpcChannel? This is superior and we should just get rid of the other code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, see #1102.