-
Notifications
You must be signed in to change notification settings - Fork 5.3k
gRPC streaming client. #1054
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
gRPC streaming client. #1054
Changes from all commits
8e0a11d
d715cae
482b3bb
2a608de
36b0c68
52c0985
5ac1896
d21b76b
211836a
a43d5c6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,113 @@ | ||
| #pragma once | ||
|
|
||
| #include <chrono> | ||
|
|
||
| #include "envoy/common/optional.h" | ||
| #include "envoy/common/pure.h" | ||
| #include "envoy/grpc/status.h" | ||
| #include "envoy/http/header_map.h" | ||
|
|
||
| #include "google/protobuf/descriptor.h" | ||
|
|
||
| namespace Envoy { | ||
| namespace Grpc { | ||
|
|
||
| /** | ||
| * An in-flight gRPC stream. | ||
| */ | ||
| template <class RequestType> class AsyncClientStream { | ||
| public: | ||
| virtual ~AsyncClientStream() {} | ||
|
|
||
| /** | ||
| * Send request message to the stream. | ||
| * @param request protobuf serializable message. | ||
| */ | ||
| virtual void sendMessage(const RequestType& request) PURE; | ||
|
||
|
|
||
| /** | ||
| * Close the stream locally. No further methods may be invoked on the | ||
| * stream object, but callbacks may still be received until the stream is closed remotely. | ||
| */ | ||
| virtual void close() PURE; | ||
|
|
||
| /** | ||
| * Close the stream locally and remotely (as needed). No further methods may be invoked on the | ||
| * stream object and no further callbacks will be invoked. | ||
| */ | ||
| virtual void reset() PURE; | ||
| }; | ||
|
|
||
| /** | ||
| * Notifies caller of async gRPC stream status. | ||
| * Note the gRPC stream is full-duplex, even if the local to remote stream has been ended by | ||
| * AsyncClientStream.close(), AsyncClientCallbacks can continue to receive events until the remote | ||
| * to local stream is closed (onRemoteClose), and vice versa. Once the stream is closed remotely, no | ||
| * further callbacks will be invoked. | ||
| */ | ||
| template <class ResponseType> class AsyncClientCallbacks { | ||
| public: | ||
| virtual ~AsyncClientCallbacks() {} | ||
|
|
||
| /** | ||
| * Called when populating the headers to send with initial metadata. | ||
| * @param metadata initial metadata reference. | ||
| */ | ||
| virtual void onCreateInitialMetadata(Http::HeaderMap& metadata) PURE; | ||
|
||
|
|
||
| /** | ||
| * Called when initial metadata is recevied. | ||
| * @param metadata initial metadata reference. | ||
| */ | ||
| virtual void onReceiveInitialMetadata(Http::HeaderMapPtr&& metadata) PURE; | ||
|
|
||
| /** | ||
| * Called when an async gRPC message is received. | ||
| * @param response the gRPC message. | ||
| */ | ||
| virtual void onReceiveMessage(std::unique_ptr<ResponseType>&& message) PURE; | ||
|
|
||
| /** | ||
| * Called when trailing metadata is recevied. | ||
| * @param metadata trailing metadata reference. | ||
| */ | ||
| virtual void onReceiveTrailingMetadata(Http::HeaderMapPtr&& metadata) PURE; | ||
|
|
||
| /** | ||
| * Called when the remote closes or an error occurs on the gRPC stream. The stream is | ||
| * considered remotely closed after this invocation and no further callbacks will be | ||
| * invoked. A non-Ok status implies that stream is also locally closed and that no | ||
| * further stream operations are permitted. | ||
| * @param status the gRPC status. | ||
| */ | ||
| virtual void onRemoteClose(Status::GrpcStatus status) PURE; | ||
| }; | ||
|
|
||
| /** | ||
| * Supports sending gRPC requests and receiving responses asynchronously. This can be used to | ||
| * implement either plain gRPC or streaming gRPC calls. | ||
| */ | ||
| template <class RequestType, class ResponseType> class AsyncClient { | ||
| public: | ||
| virtual ~AsyncClient() {} | ||
|
|
||
| /** | ||
| * Start a gRPC stream asynchronously. | ||
| * @param service_method protobuf descriptor of gRPC service method. | ||
| * @param callbacks the callbacks to be notified of stream status. | ||
| * @param timeout supplies the stream timeout, measured since when the frame with end_stream | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not sure off the top of my head how to fix this yet, but calling out that I'm not sure this definition of timeout makes sense for a streaming RPC. Not sure if we want to leave this here, or split interface somehow for streaming and unary?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was cribbing from the
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We had previous discussion around this. The intended behavior for
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK, I think it's fine for now, we can revisit later, we will only be using it with infinite timeout for the APIs. |
||
| * flag is sent until when the first frame is received. | ||
| * @return a stream handle or nullptr if no stream could be started. NOTE: In this case | ||
| * onRemoteClose() has already been called inline. The client owns the stream and | ||
| * the handle can be used to send more messages or finish the stream. It is expected that | ||
| * finish() is invoked by the caller to notify the client that the stream resources may | ||
| * be reclaimed. | ||
| */ | ||
| virtual AsyncClientStream<RequestType>* | ||
| start(const google::protobuf::MethodDescriptor& service_method, | ||
| AsyncClientCallbacks<ResponseType>& callbacks, | ||
| const Optional<std::chrono::milliseconds>& timeout) PURE; | ||
| }; | ||
|
|
||
| } // Grpc | ||
| } // Envoy | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,217 @@ | ||
| #pragma once | ||
|
|
||
| #include "envoy/grpc/async_client.h" | ||
|
|
||
| #include "common/common/enum_to_int.h" | ||
| #include "common/common/linked_object.h" | ||
| #include "common/grpc/codec.h" | ||
| #include "common/grpc/common.h" | ||
| #include "common/http/async_client_impl.h" | ||
| #include "common/http/header_map_impl.h" | ||
| #include "common/http/utility.h" | ||
|
|
||
| namespace Envoy { | ||
| namespace Grpc { | ||
|
|
||
| template <class RequestType, class ResponseType> class AsyncClientStreamImpl; | ||
|
|
||
| template <class RequestType, class ResponseType> | ||
| class AsyncClientImpl final : public AsyncClient<RequestType, ResponseType> { | ||
| public: | ||
| AsyncClientImpl(Upstream::ClusterManager& cm, const std::string& remote_cluster_name) | ||
| : cm_(cm), remote_cluster_name_(remote_cluster_name) {} | ||
|
|
||
| ~AsyncClientImpl() override { ASSERT(active_streams_.empty()); } | ||
|
|
||
| // Grpc::AsyncClient | ||
| AsyncClientStream<RequestType>* | ||
| start(const google::protobuf::MethodDescriptor& service_method, | ||
| AsyncClientCallbacks<ResponseType>& callbacks, | ||
| const Optional<std::chrono::milliseconds>& timeout) override { | ||
| std::unique_ptr<AsyncClientStreamImpl<RequestType, ResponseType>> grpc_stream{ | ||
| new AsyncClientStreamImpl<RequestType, ResponseType>(*this, callbacks)}; | ||
| Http::AsyncClient::Stream* http_stream = | ||
| cm_.httpAsyncClientForCluster(remote_cluster_name_) | ||
| .start(*grpc_stream, Optional<std::chrono::milliseconds>(timeout)); | ||
|
|
||
| if (http_stream == nullptr) { | ||
| callbacks.onRemoteClose(Status::GrpcStatus::Unavailable); | ||
| return nullptr; | ||
| } | ||
|
|
||
| grpc_stream->set_stream(http_stream); | ||
|
|
||
| Http::MessagePtr message = Common::prepareHeaders( | ||
| remote_cluster_name_, service_method.service()->full_name(), service_method.name()); | ||
| callbacks.onCreateInitialMetadata(message->headers()); | ||
|
|
||
| http_stream->sendHeaders(message->headers(), false); | ||
|
||
| // If sendHeaders() caused a reset, onRemoteClose() has been called inline and we should bail. | ||
| if (grpc_stream->http_reset_) { | ||
| return nullptr; | ||
| } | ||
|
|
||
| grpc_stream->moveIntoList(std::move(grpc_stream), active_streams_); | ||
| return active_streams_.front().get(); | ||
| } | ||
|
|
||
| private: | ||
| Upstream::ClusterManager& cm_; | ||
| const std::string remote_cluster_name_; | ||
| std::list<std::unique_ptr<AsyncClientStreamImpl<RequestType, ResponseType>>> active_streams_; | ||
|
|
||
| friend class AsyncClientStreamImpl<RequestType, ResponseType>; | ||
| }; | ||
|
|
||
| template <class RequestType, class ResponseType> | ||
| class AsyncClientStreamImpl : public AsyncClientStream<RequestType>, | ||
| Http::AsyncClient::StreamCallbacks, | ||
| LinkedObject<AsyncClientStreamImpl<RequestType, ResponseType>> { | ||
| public: | ||
| AsyncClientStreamImpl(AsyncClientImpl<RequestType, ResponseType>& parent, | ||
| AsyncClientCallbacks<ResponseType>& callbacks) | ||
| : parent_(parent), callbacks_(callbacks) {} | ||
|
|
||
| // Http::AsyncClient::StreamCallbacks | ||
| void onHeaders(Http::HeaderMapPtr&& headers, bool end_stream) override { | ||
| ASSERT(!remote_closed_); | ||
| const auto http_response_status = Http::Utility::getResponseStatus(*headers); | ||
| if (http_response_status != enumToInt(Http::Code::OK)) { | ||
| // https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md requires that | ||
| // grpc-status be used if available. | ||
| if (end_stream && Common::getGrpcStatus(*headers).valid()) { | ||
| onTrailers(std::move(headers)); | ||
| return; | ||
| } | ||
| streamError(Common::httpToGrpcStatus(http_response_status)); | ||
| return; | ||
| } | ||
| if (end_stream) { | ||
| onTrailers(std::move(headers)); | ||
| return; | ||
| } | ||
| callbacks_.onReceiveInitialMetadata(std::move(headers)); | ||
| } | ||
|
|
||
| void onData(Buffer::Instance& data, bool end_stream) override { | ||
| ASSERT(!remote_closed_); | ||
| if (end_stream) { | ||
| streamError(Status::GrpcStatus::Internal); | ||
| return; | ||
| } | ||
|
|
||
| decoded_frames_.clear(); | ||
| if (!decoder_.decode(data, decoded_frames_)) { | ||
| streamError(Status::GrpcStatus::Internal); | ||
| return; | ||
| } | ||
|
|
||
| for (const auto& frame : decoded_frames_) { | ||
| std::unique_ptr<ResponseType> response(new ResponseType()); | ||
| // TODO(htuch): We can avoid linearizing the buffer here when Buffer::Instance implements | ||
| // protobuf ZeroCopyInputStream. | ||
| // TODO(htuch): Need to add support for compressed responses as well here. | ||
| if (frame.flags_ != GRPC_FH_DEFAULT || | ||
| !response->ParseFromArray(frame.data_->linearize(frame.data_->length()), | ||
| frame.data_->length())) { | ||
| streamError(Status::GrpcStatus::Internal); | ||
| return; | ||
| } | ||
| callbacks_.onReceiveMessage(std::move(response)); | ||
| } | ||
| } | ||
|
|
||
| void onTrailers(Http::HeaderMapPtr&& trailers) override { | ||
| ASSERT(!remote_closed_); | ||
|
|
||
| const Optional<Status::GrpcStatus> grpc_status = Common::getGrpcStatus(*trailers); | ||
| if (!grpc_status.valid()) { | ||
| streamError(Status::GrpcStatus::Internal); | ||
| return; | ||
| } | ||
| if (grpc_status.value() != Status::GrpcStatus::Ok) { | ||
| streamError(grpc_status.value()); | ||
| return; | ||
| } | ||
| callbacks_.onReceiveTrailingMetadata(std::move(trailers)); | ||
| callbacks_.onRemoteClose(Status::GrpcStatus::Ok); | ||
| closeRemote(); | ||
| } | ||
|
|
||
| void onReset() override { | ||
| if (http_reset_) { | ||
| return; | ||
| } | ||
|
|
||
| http_reset_ = true; | ||
| streamError(Status::GrpcStatus::Internal); | ||
| } | ||
|
|
||
| // Grpc::AsyncClientStream | ||
| void sendMessage(const RequestType& request) override { | ||
| stream_->sendData(*Common::serializeBody(request), false); | ||
| } | ||
|
|
||
| void close() override { closeLocal(); } | ||
|
|
||
| void reset() override { | ||
| // Both closeLocal() and closeRemote() might self-destruct the object. We don't use these below | ||
| // to avoid sequencing issues. | ||
| local_closed_ |= true; | ||
| remote_closed_ |= true; | ||
| cleanup(); | ||
| } | ||
|
|
||
| void set_stream(Http::AsyncClient::Stream* stream) { stream_ = stream; } | ||
|
|
||
| private: | ||
| void streamError(Status::GrpcStatus grpc_status) { | ||
| callbacks_.onRemoteClose(grpc_status); | ||
| reset(); | ||
| } | ||
|
|
||
| void cleanup() { | ||
| if (!http_reset_) { | ||
| http_reset_ = true; | ||
| stream_->reset(); | ||
| } | ||
|
|
||
| // This will destroy us, but only do so if we are actually in a list. This does not happen in | ||
| // the immediate failure case. | ||
| if (LinkedObject<AsyncClientStreamImpl<RequestType, ResponseType>>::inserted()) { | ||
| LinkedObject<AsyncClientStreamImpl<RequestType, ResponseType>>::removeFromList( | ||
| parent_.active_streams_); | ||
| } | ||
| } | ||
|
|
||
| void closeLocal() { | ||
| local_closed_ |= true; | ||
| if (complete()) { | ||
| cleanup(); | ||
| } | ||
| } | ||
|
|
||
| void closeRemote() { | ||
| remote_closed_ |= true; | ||
| if (complete()) { | ||
| cleanup(); | ||
| } | ||
| } | ||
|
|
||
| bool complete() const { return local_closed_ && remote_closed_; } | ||
|
|
||
| AsyncClientImpl<RequestType, ResponseType>& parent_; | ||
| AsyncClientCallbacks<ResponseType>& callbacks_; | ||
| bool local_closed_{}; | ||
| bool remote_closed_{}; | ||
| bool http_reset_{}; | ||
| Http::AsyncClient::Stream* stream_{}; | ||
| Decoder decoder_; | ||
| // This is a member to avoid reallocation on every onData(). | ||
| std::vector<Frame> decoded_frames_; | ||
|
|
||
| friend class AsyncClientImpl<RequestType, ResponseType>; | ||
| }; | ||
|
|
||
| } // namespace Grpc | ||
| } // namespace Envoy | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
general comment. Can we open issue to track deprecating RpcChannel? This is superior and we should just get rid of the other code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, see #1102.