diff --git a/include/envoy/grpc/BUILD b/include/envoy/grpc/BUILD index da93fb6638735..689e096807436 100644 --- a/include/envoy/grpc/BUILD +++ b/include/envoy/grpc/BUILD @@ -8,6 +8,17 @@ load( envoy_package() +envoy_cc_library( + name = "async_client_interface", + hdrs = ["async_client.h"], + external_deps = ["protobuf"], + deps = [ + ":status", + "//include/envoy/common:optional", + "//include/envoy/http:header_map_interface", + ], +) + envoy_cc_library( name = "rpc_channel_interface", hdrs = ["rpc_channel.h"], diff --git a/include/envoy/grpc/async_client.h b/include/envoy/grpc/async_client.h new file mode 100644 index 0000000000000..ba304d8195d9c --- /dev/null +++ b/include/envoy/grpc/async_client.h @@ -0,0 +1,113 @@ +#pragma once + +#include + +#include "envoy/common/optional.h" +#include "envoy/common/pure.h" +#include "envoy/grpc/status.h" +#include "envoy/http/header_map.h" + +#include "google/protobuf/descriptor.h" + +namespace Envoy { +namespace Grpc { + +/** + * An in-flight gRPC stream. + */ +template class AsyncClientStream { +public: + virtual ~AsyncClientStream() {} + + /** + * Send request message to the stream. + * @param request protobuf serializable message. + */ + virtual void sendMessage(const RequestType& request) PURE; + + /** + * Close the stream locally. No further methods may be invoked on the + * stream object, but callbacks may still be received until the stream is closed remotely. + */ + virtual void close() PURE; + + /** + * Close the stream locally and remotely (as needed). No further methods may be invoked on the + * stream object and no further callbacks will be invoked. + */ + virtual void reset() PURE; +}; + +/** + * Notifies caller of async gRPC stream status. + * Note the gRPC stream is full-duplex, even if the local to remote stream has been ended by + * AsyncClientStream.close(), AsyncClientCallbacks can continue to receive events until the remote + * to local stream is closed (onRemoteClose), and vice versa. Once the stream is closed remotely, no + * further callbacks will be invoked. + */ +template class AsyncClientCallbacks { +public: + virtual ~AsyncClientCallbacks() {} + + /** + * Called when populating the headers to send with initial metadata. + * @param metadata initial metadata reference. + */ + virtual void onCreateInitialMetadata(Http::HeaderMap& metadata) PURE; + + /** + * Called when initial metadata is recevied. + * @param metadata initial metadata reference. + */ + virtual void onReceiveInitialMetadata(Http::HeaderMapPtr&& metadata) PURE; + + /** + * Called when an async gRPC message is received. + * @param response the gRPC message. + */ + virtual void onReceiveMessage(std::unique_ptr&& message) PURE; + + /** + * Called when trailing metadata is recevied. + * @param metadata trailing metadata reference. + */ + virtual void onReceiveTrailingMetadata(Http::HeaderMapPtr&& metadata) PURE; + + /** + * Called when the remote closes or an error occurs on the gRPC stream. The stream is + * considered remotely closed after this invocation and no further callbacks will be + * invoked. A non-Ok status implies that stream is also locally closed and that no + * further stream operations are permitted. + * @param status the gRPC status. + */ + virtual void onRemoteClose(Status::GrpcStatus status) PURE; +}; + +/** + * Supports sending gRPC requests and receiving responses asynchronously. This can be used to + * implement either plain gRPC or streaming gRPC calls. + */ +template class AsyncClient { +public: + virtual ~AsyncClient() {} + + /** + * Start a gRPC stream asynchronously. + * @param service_method protobuf descriptor of gRPC service method. + * @param callbacks the callbacks to be notified of stream status. + * @param timeout supplies the stream timeout, measured since when the frame with end_stream + * flag is sent until when the first frame is received. + * @return a stream handle or nullptr if no stream could be started. NOTE: In this case + * onRemoteClose() has already been called inline. The client owns the stream and + * the handle can be used to send more messages or finish the stream. It is expected that + * finish() is invoked by the caller to notify the client that the stream resources may + * be reclaimed. + */ + virtual AsyncClientStream* + start(const google::protobuf::MethodDescriptor& service_method, + AsyncClientCallbacks& callbacks, + const Optional& timeout) PURE; +}; + +} // Grpc +} // Envoy diff --git a/include/envoy/grpc/rpc_channel.h b/include/envoy/grpc/rpc_channel.h index 07bee12f15214..4d13634d5dd6e 100644 --- a/include/envoy/grpc/rpc_channel.h +++ b/include/envoy/grpc/rpc_channel.h @@ -48,6 +48,7 @@ class RpcChannelCallbacks { * can accept a RequestCallbacks object. An RpcChannel should be passed to the constructor of an RPC * stub generated via protoc using the "option cc_generic_services = true;" option. It can be used * for multiple service calls, but not concurrently. + * DEPRECATED: See https://github.com/lyft/envoy/issues/1102 */ class RpcChannel : public proto::RpcChannel { public: diff --git a/source/common/grpc/BUILD b/source/common/grpc/BUILD index cdc291da453da..da33e1340294d 100644 --- a/source/common/grpc/BUILD +++ b/source/common/grpc/BUILD @@ -8,6 +8,17 @@ load( envoy_package() +envoy_cc_library( + name = "async_client_lib", + hdrs = ["async_client_impl.h"], + deps = [ + ":codec_lib", + ":common_lib", + "//include/envoy/grpc:async_client_interface", + "//source/common/http:async_client_lib", + ], +) + envoy_cc_library( name = "codec_lib", srcs = ["codec.cc"], diff --git a/source/common/grpc/async_client_impl.h b/source/common/grpc/async_client_impl.h new file mode 100644 index 0000000000000..50a115ce24ce0 --- /dev/null +++ b/source/common/grpc/async_client_impl.h @@ -0,0 +1,217 @@ +#pragma once + +#include "envoy/grpc/async_client.h" + +#include "common/common/enum_to_int.h" +#include "common/common/linked_object.h" +#include "common/grpc/codec.h" +#include "common/grpc/common.h" +#include "common/http/async_client_impl.h" +#include "common/http/header_map_impl.h" +#include "common/http/utility.h" + +namespace Envoy { +namespace Grpc { + +template class AsyncClientStreamImpl; + +template +class AsyncClientImpl final : public AsyncClient { +public: + AsyncClientImpl(Upstream::ClusterManager& cm, const std::string& remote_cluster_name) + : cm_(cm), remote_cluster_name_(remote_cluster_name) {} + + ~AsyncClientImpl() override { ASSERT(active_streams_.empty()); } + + // Grpc::AsyncClient + AsyncClientStream* + start(const google::protobuf::MethodDescriptor& service_method, + AsyncClientCallbacks& callbacks, + const Optional& timeout) override { + std::unique_ptr> grpc_stream{ + new AsyncClientStreamImpl(*this, callbacks)}; + Http::AsyncClient::Stream* http_stream = + cm_.httpAsyncClientForCluster(remote_cluster_name_) + .start(*grpc_stream, Optional(timeout)); + + if (http_stream == nullptr) { + callbacks.onRemoteClose(Status::GrpcStatus::Unavailable); + return nullptr; + } + + grpc_stream->set_stream(http_stream); + + Http::MessagePtr message = Common::prepareHeaders( + remote_cluster_name_, service_method.service()->full_name(), service_method.name()); + callbacks.onCreateInitialMetadata(message->headers()); + + http_stream->sendHeaders(message->headers(), false); + // If sendHeaders() caused a reset, onRemoteClose() has been called inline and we should bail. + if (grpc_stream->http_reset_) { + return nullptr; + } + + grpc_stream->moveIntoList(std::move(grpc_stream), active_streams_); + return active_streams_.front().get(); + } + +private: + Upstream::ClusterManager& cm_; + const std::string remote_cluster_name_; + std::list>> active_streams_; + + friend class AsyncClientStreamImpl; +}; + +template +class AsyncClientStreamImpl : public AsyncClientStream, + Http::AsyncClient::StreamCallbacks, + LinkedObject> { +public: + AsyncClientStreamImpl(AsyncClientImpl& parent, + AsyncClientCallbacks& callbacks) + : parent_(parent), callbacks_(callbacks) {} + + // Http::AsyncClient::StreamCallbacks + void onHeaders(Http::HeaderMapPtr&& headers, bool end_stream) override { + ASSERT(!remote_closed_); + const auto http_response_status = Http::Utility::getResponseStatus(*headers); + if (http_response_status != enumToInt(Http::Code::OK)) { + // https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md requires that + // grpc-status be used if available. + if (end_stream && Common::getGrpcStatus(*headers).valid()) { + onTrailers(std::move(headers)); + return; + } + streamError(Common::httpToGrpcStatus(http_response_status)); + return; + } + if (end_stream) { + onTrailers(std::move(headers)); + return; + } + callbacks_.onReceiveInitialMetadata(std::move(headers)); + } + + void onData(Buffer::Instance& data, bool end_stream) override { + ASSERT(!remote_closed_); + if (end_stream) { + streamError(Status::GrpcStatus::Internal); + return; + } + + decoded_frames_.clear(); + if (!decoder_.decode(data, decoded_frames_)) { + streamError(Status::GrpcStatus::Internal); + return; + } + + for (const auto& frame : decoded_frames_) { + std::unique_ptr response(new ResponseType()); + // TODO(htuch): We can avoid linearizing the buffer here when Buffer::Instance implements + // protobuf ZeroCopyInputStream. + // TODO(htuch): Need to add support for compressed responses as well here. + if (frame.flags_ != GRPC_FH_DEFAULT || + !response->ParseFromArray(frame.data_->linearize(frame.data_->length()), + frame.data_->length())) { + streamError(Status::GrpcStatus::Internal); + return; + } + callbacks_.onReceiveMessage(std::move(response)); + } + } + + void onTrailers(Http::HeaderMapPtr&& trailers) override { + ASSERT(!remote_closed_); + + const Optional grpc_status = Common::getGrpcStatus(*trailers); + if (!grpc_status.valid()) { + streamError(Status::GrpcStatus::Internal); + return; + } + if (grpc_status.value() != Status::GrpcStatus::Ok) { + streamError(grpc_status.value()); + return; + } + callbacks_.onReceiveTrailingMetadata(std::move(trailers)); + callbacks_.onRemoteClose(Status::GrpcStatus::Ok); + closeRemote(); + } + + void onReset() override { + if (http_reset_) { + return; + } + + http_reset_ = true; + streamError(Status::GrpcStatus::Internal); + } + + // Grpc::AsyncClientStream + void sendMessage(const RequestType& request) override { + stream_->sendData(*Common::serializeBody(request), false); + } + + void close() override { closeLocal(); } + + void reset() override { + // Both closeLocal() and closeRemote() might self-destruct the object. We don't use these below + // to avoid sequencing issues. + local_closed_ |= true; + remote_closed_ |= true; + cleanup(); + } + + void set_stream(Http::AsyncClient::Stream* stream) { stream_ = stream; } + +private: + void streamError(Status::GrpcStatus grpc_status) { + callbacks_.onRemoteClose(grpc_status); + reset(); + } + + void cleanup() { + if (!http_reset_) { + http_reset_ = true; + stream_->reset(); + } + + // This will destroy us, but only do so if we are actually in a list. This does not happen in + // the immediate failure case. + if (LinkedObject>::inserted()) { + LinkedObject>::removeFromList( + parent_.active_streams_); + } + } + + void closeLocal() { + local_closed_ |= true; + if (complete()) { + cleanup(); + } + } + + void closeRemote() { + remote_closed_ |= true; + if (complete()) { + cleanup(); + } + } + + bool complete() const { return local_closed_ && remote_closed_; } + + AsyncClientImpl& parent_; + AsyncClientCallbacks& callbacks_; + bool local_closed_{}; + bool remote_closed_{}; + bool http_reset_{}; + Http::AsyncClient::Stream* stream_{}; + Decoder decoder_; + // This is a member to avoid reallocation on every onData(). + std::vector decoded_frames_; + + friend class AsyncClientImpl; +}; + +} // namespace Grpc +} // namespace Envoy diff --git a/source/common/grpc/common.cc b/source/common/grpc/common.cc index 6ab0d6ff6c155..3bf6a107242f2 100644 --- a/source/common/grpc/common.cc +++ b/source/common/grpc/common.cc @@ -38,6 +38,28 @@ Optional Common::getGrpcStatus(const Http::HeaderMap& traile return Optional(static_cast(grpc_status_code)); } +Status::GrpcStatus Common::httpToGrpcStatus(uint64_t http_response_status) { + // From + // https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md. + switch (http_response_status) { + case 400: + return Status::GrpcStatus::Internal; + case 401: + return Status::GrpcStatus::Unauthenticated; + case 403: + return Status::GrpcStatus::PermissionDenied; + case 404: + return Status::GrpcStatus::Unimplemented; + case 429: + case 502: + case 503: + case 504: + return Status::GrpcStatus::Unavailable; + default: + return Status::GrpcStatus::Unknown; + } +} + void Common::chargeStat(const Upstream::ClusterInfo& cluster, const std::string& grpc_service, const std::string& grpc_method, bool success) { cluster.statsScope() diff --git a/source/common/grpc/common.h b/source/common/grpc/common.h index 055fbd0617d83..47314eabdd7eb 100644 --- a/source/common/grpc/common.h +++ b/source/common/grpc/common.h @@ -28,11 +28,21 @@ class Common { public: /** * Returns the GrpcStatus code from a given set of headers, if present. - * @headers the headers to parse. - * @returns the parsed status code or InvalidCode if no valid status is found. + * @param headers the headers to parse. + * @return Optional the parsed status code or InvalidCode if no valid status + * is found. */ static Optional getGrpcStatus(const Http::HeaderMap& headers); + /** + * Returns the gRPC status code from a given HTTP response status code. Ordinarily, it is expected + * that a 200 response is provided, but gRPC defines a mapping for intermediaries that are not + * gRPC aware, see https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md. + * @param http_response_status HTTP status code. + * @return Status::GrpcStatus corresponding gRPC status code. + */ + static Status::GrpcStatus httpToGrpcStatus(uint64_t http_response_status); + /** * Charge a success/failure stat to a cluster/service/method. * @param cluster supplies the target cluster. diff --git a/source/common/grpc/rpc_channel_impl.h b/source/common/grpc/rpc_channel_impl.h index 41884e31c065b..f6678e25ac1a2 100644 --- a/source/common/grpc/rpc_channel_impl.h +++ b/source/common/grpc/rpc_channel_impl.h @@ -25,6 +25,7 @@ namespace Grpc { * needed. * 4) Inflight RPCs can be safely cancelled using cancel(). * 5) See GrpcRequestImplTest for an example. + * DEPRECATED: See https://github.com/lyft/envoy/issues/1102 */ class RpcChannelImpl : public RpcChannel, public Http::AsyncClient::Callbacks { public: diff --git a/test/common/grpc/BUILD b/test/common/grpc/BUILD index a34964177bdda..0eab7467b8446 100644 --- a/test/common/grpc/BUILD +++ b/test/common/grpc/BUILD @@ -8,6 +8,20 @@ load( envoy_package() +envoy_cc_test( + name = "async_client_impl_test", + srcs = ["async_client_impl_test.cc"], + deps = [ + "//source/common/grpc:async_client_lib", + "//test/mocks/buffer:buffer_mocks", + "//test/mocks/grpc:grpc_mocks", + "//test/mocks/http:http_mocks", + "//test/mocks/upstream:upstream_mocks", + "//test/proto:helloworld_proto", + "//test/test_common:utility_lib", + ], +) + envoy_cc_test( name = "codec_test", srcs = ["codec_test.cc"], diff --git a/test/common/grpc/async_client_impl_test.cc b/test/common/grpc/async_client_impl_test.cc new file mode 100644 index 0000000000000..76daf0600d332 --- /dev/null +++ b/test/common/grpc/async_client_impl_test.cc @@ -0,0 +1,428 @@ +#include "common/grpc/async_client_impl.h" + +#include "test/mocks/buffer/mocks.h" +#include "test/mocks/grpc/mocks.h" +#include "test/mocks/http/mocks.h" +#include "test/mocks/upstream/mocks.h" +#include "test/proto/helloworld.pb.h" +#include "test/test_common/utility.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +using testing::_; +using testing::Invoke; +using testing::Eq; +using testing::NiceMock; +using testing::Return; +using testing::ReturnRef; +using testing::Mock; + +namespace Envoy { +namespace Grpc { + +template class AsyncClientImpl; +template class AsyncClientStreamImpl; + +namespace { + +const std::string HELLO_REQUEST = "ABC"; +// We expect the 5 byte header to only have a length of 5 indicating the size of the protobuf. The +// protobuf begins with 0x0a, indicating this is the first field of type string. This is followed +// by 0x03 for the number of characters and the name ABC set above. +const char HELLO_REQUEST_DATA[] = "\x00\x00\x00\x00\x05\x0a\x03\x41\x42\x43"; +const size_t HELLO_REQUEST_SIZE = sizeof(HELLO_REQUEST_DATA) - 1; + +const std::string HELLO_REPLY = "DEFG"; +const char HELLO_REPLY_DATA[] = "\x00\x00\x00\x00\x06\x0a\x04\x44\x45\x46\x47"; +const size_t HELLO_REPLY_SIZE = sizeof(HELLO_REPLY_DATA) - 1; + +MATCHER_P(HelloworldReplyEq, rhs, "") { return arg.message() == rhs; } + +typedef std::vector> TestMetadata; + +class HelloworldStream : public MockAsyncClientCallbacks { +public: + HelloworldStream() { + ON_CALL(http_stream_, reset()).WillByDefault(Invoke([this]() { http_callbacks_->onReset(); })); + } + + ~HelloworldStream() { + if (grpc_stream_ != nullptr) { + EXPECT_CALL(http_stream_, reset()); + grpc_stream_->reset(); + } + } + + void sendRequest() { + helloworld::HelloRequest request; + request.set_name(HELLO_REQUEST); + + EXPECT_CALL( + http_stream_, + sendData(BufferStringEqual(std::string(HELLO_REQUEST_DATA, HELLO_REQUEST_SIZE)), false)); + grpc_stream_->sendMessage(request); + Mock::VerifyAndClearExpectations(&http_stream_); + } + + void sendServerInitialMetadata(TestMetadata& metadata) { + Http::HeaderMapPtr reply_headers{new Http::TestHeaderMapImpl{{":status", "200"}}}; + for (auto& value : metadata) { + reply_headers->addStatic(value.first, value.second); + } + EXPECT_CALL(*this, onReceiveInitialMetadata_(HeaderMapEqualRef(reply_headers.get()))); + http_callbacks_->onHeaders(std::move(reply_headers), false); + } + + void sendReply() { + Buffer::OwnedImpl reply_buffer(HELLO_REPLY_DATA, HELLO_REPLY_SIZE); + + helloworld::HelloReply reply; + reply.set_message(HELLO_REPLY); + EXPECT_CALL(*this, onReceiveMessage_(HelloworldReplyEq(HELLO_REPLY))); + http_callbacks_->onData(reply_buffer, false); + } + + void expectGrpcStatus(Status::GrpcStatus grpc_status) { + if (grpc_status != Status::GrpcStatus::Ok) { + EXPECT_CALL(http_stream_, reset()); + } + EXPECT_CALL(*this, onRemoteClose(grpc_status)) + .WillOnce(Invoke([this](Status::GrpcStatus grpc_status) { + if (grpc_status != Status::GrpcStatus::Ok) { + clearStream(); + } + })); + } + + void sendServerTrailers(Status::GrpcStatus grpc_status, TestMetadata metadata, + bool trailers_only = false) { + auto* reply_trailers = + new Http::TestHeaderMapImpl{{"grpc-status", std::to_string(enumToInt(grpc_status))}}; + if (trailers_only) { + reply_trailers->addViaCopy(":status", "200"); + } + for (const auto& value : metadata) { + reply_trailers->addViaCopy(value.first, value.second); + } + Http::HeaderMapPtr reply_trailers_ptr{reply_trailers}; + if (grpc_status == Status::GrpcStatus::Ok) { + EXPECT_CALL(*this, onReceiveTrailingMetadata_(HeaderMapEqualRef(reply_trailers))); + } + expectGrpcStatus(grpc_status); + if (trailers_only) { + http_callbacks_->onHeaders(std::move(reply_trailers_ptr), true); + } else { + http_callbacks_->onTrailers(std::move(reply_trailers_ptr)); + } + } + + void closeStream() { + EXPECT_CALL(http_stream_, reset()); + grpc_stream_->close(); + clearStream(); + } + + void clearStream() { grpc_stream_ = nullptr; } + + Http::AsyncClient::StreamCallbacks* http_callbacks_{}; + Http::MockAsyncClientStream http_stream_; + AsyncClientStream* grpc_stream_{}; +}; + +class GrpcAsyncClientImplTest : public testing::Test { +public: + GrpcAsyncClientImplTest() + : method_descriptor_(helloworld::Greeter::descriptor()->FindMethodByName("SayHello")), + grpc_client_(new AsyncClientImpl( + cm_, "test_cluster")) { + ON_CALL(cm_, httpAsyncClientForCluster("test_cluster")).WillByDefault(ReturnRef(http_client_)); + } + + std::unique_ptr createStream(TestMetadata& initial_metadata) { + std::unique_ptr stream(new HelloworldStream()); + std::vector keys; + EXPECT_CALL(*stream, onCreateInitialMetadata(_)) + .WillOnce(Invoke([&initial_metadata](Http::HeaderMap& headers) { + for (auto& value : initial_metadata) { + headers.addStatic(value.first, value.second); + } + })); + Http::TestHeaderMapImpl headers{{":method", "POST"}, + {":path", "/helloworld.Greeter/SayHello"}, + {":authority", "test_cluster"}, + {"content-type", "application/grpc"}}; + for (auto& value : initial_metadata) { + headers.addStatic(value.first, value.second); + } + EXPECT_CALL(http_client_, start(_, _)) + .WillOnce(Invoke([&stream](Http::AsyncClient::StreamCallbacks& callbacks, + const Optional& timeout) { + UNREFERENCED_PARAMETER(timeout); + stream->http_callbacks_ = &callbacks; + return &stream->http_stream_; + })); + EXPECT_CALL(stream->http_stream_, sendHeaders(HeaderMapEqualRef(&headers), _)); + stream->grpc_stream_ = + grpc_client_->start(*method_descriptor_, *stream, Optional()); + EXPECT_NE(stream->grpc_stream_, nullptr); + return stream; + } + + const google::protobuf::MethodDescriptor* method_descriptor_; + NiceMock http_client_; + NiceMock cm_; + std::unique_ptr> grpc_client_; +}; + +// Validate that a simple request-reply stream works. +TEST_F(GrpcAsyncClientImplTest, BasicStream) { + TestMetadata empty_metadata; + auto stream = createStream(empty_metadata); + stream->sendRequest(); + stream->sendServerInitialMetadata(empty_metadata); + stream->sendReply(); + stream->sendServerTrailers(Status::GrpcStatus::Ok, empty_metadata); + stream->closeStream(); +} + +// Validate that multiple streams work. +TEST_F(GrpcAsyncClientImplTest, MultiStream) { + TestMetadata empty_metadata; + auto stream_0 = createStream(empty_metadata); + auto stream_1 = createStream(empty_metadata); + stream_0->sendRequest(); + stream_1->sendRequest(); + stream_0->sendServerInitialMetadata(empty_metadata); + stream_0->sendReply(); + stream_1->sendServerTrailers(Status::GrpcStatus::Unavailable, empty_metadata); + stream_0->sendServerTrailers(Status::GrpcStatus::Ok, empty_metadata); + stream_0->closeStream(); +} + +// Validate that a failure in the HTTP client returns immediately with status +// UNAVAILABLE. +TEST_F(GrpcAsyncClientImplTest, HttpStartFail) { + MockAsyncClientCallbacks grpc_callbacks; + ON_CALL(http_client_, start(_, _)).WillByDefault(Return(nullptr)); + EXPECT_CALL(grpc_callbacks, onRemoteClose(Status::GrpcStatus::Unavailable)); + auto* grpc_stream = grpc_client_->start(*method_descriptor_, grpc_callbacks, + Optional()); + EXPECT_EQ(grpc_stream, nullptr); +} + +// Validate that a failure to sendHeaders() in the HTTP client returns +// immediately with status INTERNAL. +TEST_F(GrpcAsyncClientImplTest, HttpSendHeadersFail) { + MockAsyncClientCallbacks grpc_callbacks; + Http::AsyncClient::StreamCallbacks* http_callbacks; + Http::MockAsyncClientStream http_stream; + EXPECT_CALL(http_client_, start(_, _)) + .WillOnce(Invoke( + [&http_callbacks, &http_stream](Http::AsyncClient::StreamCallbacks& callbacks, + const Optional& timeout) { + UNREFERENCED_PARAMETER(timeout); + http_callbacks = &callbacks; + return &http_stream; + })); + EXPECT_CALL(grpc_callbacks, onCreateInitialMetadata(_)); + EXPECT_CALL(http_stream, sendHeaders(_, _)) + .WillOnce(Invoke([&http_callbacks](Http::HeaderMap& headers, bool end_stream) { + UNREFERENCED_PARAMETER(headers); + UNREFERENCED_PARAMETER(end_stream); + http_callbacks->onReset(); + })); + EXPECT_CALL(grpc_callbacks, onRemoteClose(Status::GrpcStatus::Internal)); + auto* grpc_stream = grpc_client_->start(*method_descriptor_, grpc_callbacks, + Optional()); + EXPECT_EQ(grpc_stream, nullptr); +} + +// Validate that a non-200 HTTP status results in the gRPC error as per +// https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md. +TEST_F(GrpcAsyncClientImplTest, HttpNon200Status) { + for (const auto http_response_status : {400, 401, 403, 404, 429, 431}) { + TestMetadata empty_metadata; + auto stream = createStream(empty_metadata); + Http::HeaderMapPtr reply_headers{ + new Http::TestHeaderMapImpl{{":status", std::to_string(http_response_status)}}}; + stream->expectGrpcStatus(Common::httpToGrpcStatus(http_response_status)); + stream->http_callbacks_->onHeaders(std::move(reply_headers), false); + } +} + +// Validate that a non-200 HTTP status results in fallback to grpc-status. +TEST_F(GrpcAsyncClientImplTest, GrpcStatusFallback) { + TestMetadata empty_metadata; + auto stream = createStream(empty_metadata); + Http::HeaderMapPtr reply_headers{new Http::TestHeaderMapImpl{ + {":status", "404"}, + {"grpc-status", std::to_string(enumToInt(Status::GrpcStatus::PermissionDenied))}}}; + stream->expectGrpcStatus(Status::GrpcStatus::PermissionDenied); + stream->http_callbacks_->onHeaders(std::move(reply_headers), true); +} + +// Validate that a HTTP-level reset is handled as an INTERNAL gRPC error. +TEST_F(GrpcAsyncClientImplTest, HttpReset) { + TestMetadata empty_metadata; + auto stream = createStream(empty_metadata); + EXPECT_CALL(*stream, onRemoteClose(Status::GrpcStatus::Internal)); + stream->http_callbacks_->onReset(); + stream->clearStream(); +} + +// Validate that a reply with bad gRPC framing is handled as an INTERNAL gRPC +// error. +TEST_F(GrpcAsyncClientImplTest, BadReplyGrpcFraming) { + TestMetadata empty_metadata; + auto stream = createStream(empty_metadata); + stream->sendRequest(); + stream->sendServerInitialMetadata(empty_metadata); + stream->expectGrpcStatus(Status::GrpcStatus::Internal); + Buffer::OwnedImpl reply_buffer("\xde\xad\xbe\xef\x00", 5); + stream->http_callbacks_->onData(reply_buffer, false); +} + +// Validate that a reply with bad protobuf is handled as an INTERNAL gRPC error. +TEST_F(GrpcAsyncClientImplTest, BadReplyProtobuf) { + TestMetadata empty_metadata; + auto stream = createStream(empty_metadata); + stream->sendRequest(); + stream->sendServerInitialMetadata(empty_metadata); + stream->expectGrpcStatus(Status::GrpcStatus::Internal); + Buffer::OwnedImpl reply_buffer("\x00\x00\x00\x00\x02\xff\xff", 7); + stream->http_callbacks_->onData(reply_buffer, false); +} + +// Validate that an out-of-range gRPC status is handled as an INVALID_CODE gRPC +// error. +TEST_F(GrpcAsyncClientImplTest, OutOfRangeGrpcStatus) { + TestMetadata empty_metadata; + auto stream = createStream(empty_metadata); + stream->sendServerInitialMetadata(empty_metadata); + stream->sendReply(); + stream->expectGrpcStatus(Status::GrpcStatus::InvalidCode); + Http::HeaderMapPtr reply_trailers{ + new Http::TestHeaderMapImpl{{"grpc-status", std::to_string(0x1337)}}}; + stream->http_callbacks_->onTrailers(std::move(reply_trailers)); +} + +// Validate that a missing gRPC status is handled as an INTERNAL gRPC error. +TEST_F(GrpcAsyncClientImplTest, MissingGrpcStatus) { + TestMetadata empty_metadata; + auto stream = createStream(empty_metadata); + stream->sendServerInitialMetadata(empty_metadata); + stream->sendReply(); + stream->expectGrpcStatus(Status::GrpcStatus::Internal); + Http::HeaderMapPtr reply_trailers{new Http::TestHeaderMapImpl{}}; + stream->http_callbacks_->onTrailers(std::move(reply_trailers)); +} + +// Validate that a reply terminated without trailers is handled as an INTERNAL +// gRPC error. +TEST_F(GrpcAsyncClientImplTest, ReplyNoTrailers) { + TestMetadata empty_metadata; + auto stream = createStream(empty_metadata); + stream->sendRequest(); + stream->sendServerInitialMetadata(empty_metadata); + stream->expectGrpcStatus(Status::GrpcStatus::Internal); + Buffer::OwnedImpl reply_buffer(HELLO_REPLY_DATA, HELLO_REPLY_SIZE); + helloworld::HelloReply reply; + reply.set_message(HELLO_REPLY); + stream->http_callbacks_->onData(reply_buffer, true); +} + +// Validate that send client initial metadata works. +TEST_F(GrpcAsyncClientImplTest, ClientInitialMetadata) { + TestMetadata initial_metadata = { + {Http::LowerCaseString("foo"), "bar"}, {Http::LowerCaseString("baz"), "blah"}, + }; + createStream(initial_metadata); +} + +// Validate that receiving server initial metadata works. +TEST_F(GrpcAsyncClientImplTest, ServerInitialMetadata) { + TestMetadata empty_metadata; + auto stream = createStream(empty_metadata); + stream->sendRequest(); + TestMetadata initial_metadata = { + {Http::LowerCaseString("foo"), "bar"}, {Http::LowerCaseString("baz"), "blah"}, + }; + stream->sendServerInitialMetadata(initial_metadata); +} + +// Validate that receiving server trailing metadata works. +TEST_F(GrpcAsyncClientImplTest, ServerTrailingMetadata) { + TestMetadata empty_metadata; + auto stream = createStream(empty_metadata); + stream->sendRequest(); + stream->sendServerInitialMetadata(empty_metadata); + stream->sendReply(); + TestMetadata trailing_metadata = { + {Http::LowerCaseString("foo"), "bar"}, {Http::LowerCaseString("baz"), "blah"}, + }; + stream->sendServerTrailers(Status::GrpcStatus::Ok, trailing_metadata); +} + +// Validate that a trailers-only response is handled. +TEST_F(GrpcAsyncClientImplTest, TrailersOnly) { + TestMetadata empty_metadata; + auto stream = createStream(empty_metadata); + stream->sendServerTrailers(Status::GrpcStatus::Ok, empty_metadata, true); + stream->closeStream(); +} + +// Validate that a trailers RESOURCE_EXHAUSTED reply is handled. +TEST_F(GrpcAsyncClientImplTest, ResourceExhaustedError) { + TestMetadata empty_metadata; + auto stream = createStream(empty_metadata); + stream->sendServerInitialMetadata(empty_metadata); + stream->sendReply(); + stream->sendServerTrailers(Status::GrpcStatus::ResourceExhausted, empty_metadata); +} + +// Validate that we can continue to receive after a local close. +TEST_F(GrpcAsyncClientImplTest, ReceiveAfterLocalClose) { + TestMetadata empty_metadata; + auto stream = createStream(empty_metadata); + stream->sendRequest(); + stream->closeStream(); + stream->sendServerInitialMetadata(empty_metadata); + stream->sendReply(); + stream->sendServerTrailers(Status::GrpcStatus::Ok, empty_metadata); +} + +// Validate that we can continue to send after a remote close. +TEST_F(GrpcAsyncClientImplTest, SendAfterRemoteClose) { + TestMetadata empty_metadata; + auto stream = createStream(empty_metadata); + stream->sendServerInitialMetadata(empty_metadata); + stream->sendReply(); + stream->sendServerTrailers(Status::GrpcStatus::Ok, empty_metadata); + stream->sendRequest(); + stream->closeStream(); +} + +// Validate that reset() doesn't explode on a half-closed stream (local). +TEST_F(GrpcAsyncClientImplTest, resetAfterCloseLocal) { + TestMetadata empty_metadata; + auto stream = createStream(empty_metadata); + stream->grpc_stream_->close(); + EXPECT_CALL(stream->http_stream_, reset()); + stream->grpc_stream_->reset(); + stream->clearStream(); +} + +// Validate that reset() doesn't explode on a half-closed stream (remote). +TEST_F(GrpcAsyncClientImplTest, resetAfterCloseRemote) { + TestMetadata empty_metadata; + auto stream = createStream(empty_metadata); + stream->sendServerTrailers(Status::GrpcStatus::Ok, empty_metadata, true); + EXPECT_CALL(stream->http_stream_, reset()); + stream->grpc_stream_->reset(); + stream->clearStream(); +} + +} // namespace +} // namespace Grpc +} // namespace Envoy diff --git a/test/mocks/grpc/BUILD b/test/mocks/grpc/BUILD index 08552031d52fa..8e98d6e50d93a 100644 --- a/test/mocks/grpc/BUILD +++ b/test/mocks/grpc/BUILD @@ -12,5 +12,8 @@ envoy_cc_mock( name = "grpc_mocks", srcs = ["mocks.cc"], hdrs = ["mocks.h"], - deps = ["//include/envoy/grpc:rpc_channel_interface"], + deps = [ + "//include/envoy/grpc:async_client_interface", + "//include/envoy/grpc:rpc_channel_interface", + ], ) diff --git a/test/mocks/grpc/mocks.h b/test/mocks/grpc/mocks.h index 2c64ef644c772..779bad14f9936 100644 --- a/test/mocks/grpc/mocks.h +++ b/test/mocks/grpc/mocks.h @@ -3,6 +3,7 @@ #include #include +#include "envoy/grpc/async_client.h" #include "envoy/grpc/rpc_channel.h" #include "gmock/gmock.h" @@ -10,6 +11,42 @@ namespace Envoy { namespace Grpc { +template class MockAsyncClientStream : public AsyncClientStream { +public: + MOCK_METHOD1_T(sendMessage, void(const RequestType& request)); + MOCK_METHOD0_T(close, void()); + MOCK_METHOD0_T(reset, void()); +}; + +template +class MockAsyncClientCallbacks : public AsyncClientCallbacks { +public: + void onReceiveInitialMetadata(Http::HeaderMapPtr&& metadata) { + onReceiveInitialMetadata_(*metadata); + } + + void onReceiveMessage(std::unique_ptr&& message) { onReceiveMessage_(*message); } + + void onReceiveTrailingMetadata(Http::HeaderMapPtr&& metadata) { + onReceiveTrailingMetadata_(*metadata); + } + + MOCK_METHOD1_T(onCreateInitialMetadata, void(Http::HeaderMap& metadata)); + MOCK_METHOD1_T(onReceiveInitialMetadata_, void(const Http::HeaderMap& metadata)); + MOCK_METHOD1_T(onReceiveMessage_, void(const ResponseType& message)); + MOCK_METHOD1_T(onReceiveTrailingMetadata_, void(const Http::HeaderMap& metadata)); + MOCK_METHOD1_T(onRemoteClose, void(Status::GrpcStatus status)); +}; + +template +class MockAsyncClient : public AsyncClient { +public: + MOCK_METHOD4_T(start, AsyncClientStream*( + const google::protobuf::MethodDescriptor& service_method, + AsyncClientCallbacks& callbacks, + const Optional& timeout)); +}; + class MockRpcChannelCallbacks : public RpcChannelCallbacks { public: MockRpcChannelCallbacks(); diff --git a/test/mocks/http/mocks.cc b/test/mocks/http/mocks.cc index 2b56c660d3cde..77e63cdd10259 100644 --- a/test/mocks/http/mocks.cc +++ b/test/mocks/http/mocks.cc @@ -131,6 +131,9 @@ MockAsyncClientStreamCallbacks::~MockAsyncClientStreamCallbacks() {} MockAsyncClientRequest::MockAsyncClientRequest(MockAsyncClient* client) : client_(client) {} MockAsyncClientRequest::~MockAsyncClientRequest() { client_->onRequestDestroy(); } +MockAsyncClientStream::MockAsyncClientStream() {} +MockAsyncClientStream::~MockAsyncClientStream() {} + MockFilterChainFactoryCallbacks::MockFilterChainFactoryCallbacks() {} MockFilterChainFactoryCallbacks::~MockFilterChainFactoryCallbacks() {} diff --git a/test/mocks/http/mocks.h b/test/mocks/http/mocks.h index c316cfa002b4f..797de126117fb 100644 --- a/test/mocks/http/mocks.h +++ b/test/mocks/http/mocks.h @@ -388,6 +388,17 @@ class MockAsyncClientRequest : public AsyncClient::Request { MockAsyncClient* client_; }; +class MockAsyncClientStream : public AsyncClient::Stream { +public: + MockAsyncClientStream(); + ~MockAsyncClientStream(); + + MOCK_METHOD2(sendHeaders, void(HeaderMap& headers, bool end_stream)); + MOCK_METHOD2(sendData, void(Buffer::Instance& data, bool end_stream)); + MOCK_METHOD1(sendTrailers, void(HeaderMap& trailers)); + MOCK_METHOD0(reset, void()); +}; + class MockFilterChainFactoryCallbacks : public Http::FilterChainFactoryCallbacks { public: MockFilterChainFactoryCallbacks();