From 8e0a11d8e3f6091dc24c64467e28c06e162cf9e9 Mon Sep 17 00:00:00 2001 From: Harvey Tuch Date: Tue, 6 Jun 2017 10:54:20 -0400 Subject: [PATCH 01/10] gRPC streaming client. This is a templatized gRPC client than can be used to implement unary/streaming (client, server, bidi) for arbitrary protobuf request/response messages. This will be used by the v2 xDS API clients for gRPC streams. --- include/envoy/grpc/BUILD | 11 + include/envoy/grpc/async_client.h | 113 +++++++ source/common/grpc/BUILD | 11 + source/common/grpc/async_client_impl.h | 200 +++++++++++ test/common/grpc/BUILD | 14 + test/common/grpc/async_client_impl_test.cc | 366 +++++++++++++++++++++ test/mocks/grpc/BUILD | 5 +- test/mocks/grpc/mocks.h | 37 +++ test/mocks/http/mocks.h | 8 + 9 files changed, 764 insertions(+), 1 deletion(-) create mode 100644 include/envoy/grpc/async_client.h create mode 100644 source/common/grpc/async_client_impl.h create mode 100644 test/common/grpc/async_client_impl_test.cc diff --git a/include/envoy/grpc/BUILD b/include/envoy/grpc/BUILD index da93fb6638735..689e096807436 100644 --- a/include/envoy/grpc/BUILD +++ b/include/envoy/grpc/BUILD @@ -8,6 +8,17 @@ load( envoy_package() +envoy_cc_library( + name = "async_client_interface", + hdrs = ["async_client.h"], + external_deps = ["protobuf"], + deps = [ + ":status", + "//include/envoy/common:optional", + "//include/envoy/http:header_map_interface", + ], +) + envoy_cc_library( name = "rpc_channel_interface", hdrs = ["rpc_channel.h"], diff --git a/include/envoy/grpc/async_client.h b/include/envoy/grpc/async_client.h new file mode 100644 index 0000000000000..ba304d8195d9c --- /dev/null +++ b/include/envoy/grpc/async_client.h @@ -0,0 +1,113 @@ +#pragma once + +#include + +#include "envoy/common/optional.h" +#include "envoy/common/pure.h" +#include "envoy/grpc/status.h" +#include "envoy/http/header_map.h" + +#include "google/protobuf/descriptor.h" + +namespace Envoy { +namespace Grpc { + +/** + * An in-flight gRPC stream. + */ +template class AsyncClientStream { +public: + virtual ~AsyncClientStream() {} + + /** + * Send request message to the stream. + * @param request protobuf serializable message. + */ + virtual void sendMessage(const RequestType& request) PURE; + + /** + * Close the stream locally. No further methods may be invoked on the + * stream object, but callbacks may still be received until the stream is closed remotely. + */ + virtual void close() PURE; + + /** + * Close the stream locally and remotely (as needed). No further methods may be invoked on the + * stream object and no further callbacks will be invoked. + */ + virtual void reset() PURE; +}; + +/** + * Notifies caller of async gRPC stream status. + * Note the gRPC stream is full-duplex, even if the local to remote stream has been ended by + * AsyncClientStream.close(), AsyncClientCallbacks can continue to receive events until the remote + * to local stream is closed (onRemoteClose), and vice versa. Once the stream is closed remotely, no + * further callbacks will be invoked. + */ +template class AsyncClientCallbacks { +public: + virtual ~AsyncClientCallbacks() {} + + /** + * Called when populating the headers to send with initial metadata. + * @param metadata initial metadata reference. + */ + virtual void onCreateInitialMetadata(Http::HeaderMap& metadata) PURE; + + /** + * Called when initial metadata is recevied. + * @param metadata initial metadata reference. + */ + virtual void onReceiveInitialMetadata(Http::HeaderMapPtr&& metadata) PURE; + + /** + * Called when an async gRPC message is received. + * @param response the gRPC message. + */ + virtual void onReceiveMessage(std::unique_ptr&& message) PURE; + + /** + * Called when trailing metadata is recevied. + * @param metadata trailing metadata reference. + */ + virtual void onReceiveTrailingMetadata(Http::HeaderMapPtr&& metadata) PURE; + + /** + * Called when the remote closes or an error occurs on the gRPC stream. The stream is + * considered remotely closed after this invocation and no further callbacks will be + * invoked. A non-Ok status implies that stream is also locally closed and that no + * further stream operations are permitted. + * @param status the gRPC status. + */ + virtual void onRemoteClose(Status::GrpcStatus status) PURE; +}; + +/** + * Supports sending gRPC requests and receiving responses asynchronously. This can be used to + * implement either plain gRPC or streaming gRPC calls. + */ +template class AsyncClient { +public: + virtual ~AsyncClient() {} + + /** + * Start a gRPC stream asynchronously. + * @param service_method protobuf descriptor of gRPC service method. + * @param callbacks the callbacks to be notified of stream status. + * @param timeout supplies the stream timeout, measured since when the frame with end_stream + * flag is sent until when the first frame is received. + * @return a stream handle or nullptr if no stream could be started. NOTE: In this case + * onRemoteClose() has already been called inline. The client owns the stream and + * the handle can be used to send more messages or finish the stream. It is expected that + * finish() is invoked by the caller to notify the client that the stream resources may + * be reclaimed. + */ + virtual AsyncClientStream* + start(const google::protobuf::MethodDescriptor& service_method, + AsyncClientCallbacks& callbacks, + const Optional& timeout) PURE; +}; + +} // Grpc +} // Envoy diff --git a/source/common/grpc/BUILD b/source/common/grpc/BUILD index cdc291da453da..da33e1340294d 100644 --- a/source/common/grpc/BUILD +++ b/source/common/grpc/BUILD @@ -8,6 +8,17 @@ load( envoy_package() +envoy_cc_library( + name = "async_client_lib", + hdrs = ["async_client_impl.h"], + deps = [ + ":codec_lib", + ":common_lib", + "//include/envoy/grpc:async_client_interface", + "//source/common/http:async_client_lib", + ], +) + envoy_cc_library( name = "codec_lib", srcs = ["codec.cc"], diff --git a/source/common/grpc/async_client_impl.h b/source/common/grpc/async_client_impl.h new file mode 100644 index 0000000000000..2bfb3f83faf27 --- /dev/null +++ b/source/common/grpc/async_client_impl.h @@ -0,0 +1,200 @@ +#pragma once + +#include "envoy/grpc/async_client.h" + +#include "common/common/enum_to_int.h" +#include "common/common/linked_object.h" +#include "common/grpc/codec.h" +#include "common/grpc/common.h" +#include "common/http/async_client_impl.h" +#include "common/http/header_map_impl.h" +#include "common/http/utility.h" + +namespace Envoy { +namespace Grpc { + +template class AsyncClientStreamImpl; + +template +class AsyncClientImpl final : public AsyncClient { +public: + AsyncClientImpl(Upstream::ClusterManager& cm, const std::string& remote_cluster_name) + : cm_(cm), remote_cluster_name_(remote_cluster_name) {} + + ~AsyncClientImpl() override { ASSERT(active_streams_.empty()); } + + // Grpc::AsyncClient + AsyncClientStream* + start(const google::protobuf::MethodDescriptor& service_method, + AsyncClientCallbacks& callbacks, + const Optional& timeout) override { + std::unique_ptr> grpc_stream{ + new AsyncClientStreamImpl(*this, callbacks)}; + Http::AsyncClient::Stream* http_stream = + cm_.httpAsyncClientForCluster(remote_cluster_name_) + .start(*grpc_stream, Optional(timeout)); + + if (http_stream == nullptr) { + callbacks.onRemoteClose(Status::GrpcStatus::Unavailable); + return nullptr; + } + + grpc_stream->set_stream(http_stream); + + Http::MessagePtr message = Common::prepareHeaders( + remote_cluster_name_, service_method.service()->full_name(), service_method.name()); + callbacks.onCreateInitialMetadata(message->headers()); + + http_stream->sendHeaders(message->headers(), false); + grpc_stream->moveIntoList(std::move(grpc_stream), active_streams_); + + return active_streams_.front().get(); + } + +private: + Upstream::ClusterManager& cm_; + const std::string remote_cluster_name_; + std::list>> active_streams_; + + friend class AsyncClientStreamImpl; +}; + +template +class AsyncClientStreamImpl : public AsyncClientStream, + Http::AsyncClient::StreamCallbacks, + LinkedObject> { +public: + AsyncClientStreamImpl(AsyncClientImpl& parent, + AsyncClientCallbacks& callbacks) + : parent_(parent), callbacks_(callbacks) {} + + // Http::AsyncClient::StreamCallbacks + void onHeaders(Http::HeaderMapPtr&& headers, bool end_stream) override { + ASSERT(!remote_closed_); + if (Http::Utility::getResponseStatus(*headers) != enumToInt(Http::Code::OK)) { + streamError(Status::GrpcStatus::Internal); + return; + } + if (end_stream) { + onTrailers(std::move(headers)); + return; + } + callbacks_.onReceiveInitialMetadata(std::move(headers)); + } + + void onData(Buffer::Instance& data, bool end_stream) override { + ASSERT(!remote_closed_); + std::vector frames; + if (!decoder_.decode(data, frames)) { + streamError(Status::GrpcStatus::Internal); + return; + } + + for (const auto& frame : frames) { + std::unique_ptr response(new ResponseType()); + // TODO(htuch): We can avoid linearizing the buffer here when Buffer::Instance implements + // protobuf ZeroCopyInputStream. + // TODO(htuch): Need to add support for compressed responses as well here. + if (frame.flags_ != GRPC_FH_DEFAULT || + !response->ParseFromArray(frame.data_->linearize(frame.data_->length()), + frame.data_->length())) { + streamError(Status::GrpcStatus::Internal); + return; + } + callbacks_.onReceiveMessage(std::move(response)); + } + if (end_stream) { + streamError(Status::GrpcStatus::Internal); + return; + } + } + + void onTrailers(Http::HeaderMapPtr&& trailers) override { + ASSERT(!remote_closed_); + callbacks_.onReceiveTrailingMetadata(std::move(trailers)); + + const Optional grpc_status = Common::getGrpcStatus(*trailers); + if (!grpc_status.valid()) { + streamError(Status::GrpcStatus::Internal); + return; + } + if (grpc_status.value() != Status::GrpcStatus::Ok) { + streamError(grpc_status.value()); + return; + } + + callbacks_.onRemoteClose(Status::GrpcStatus::Ok); + closeRemote(); + } + + void onReset() override { + if (http_reset_) { + return; + } + + streamError(Status::GrpcStatus::Internal); + } + + // Grpc::AsyncClientStream + void sendMessage(const RequestType& request) override { + stream_->sendData(*Common::serializeBody(request), false); + } + + void close() override { closeLocal(); } + + void reset() override { + closeLocal(); + closeRemote(); + } + + void set_stream(Http::AsyncClient::Stream* stream) { stream_ = stream; } + +private: + void streamError(Status::GrpcStatus grpc_status) { + callbacks_.onRemoteClose(grpc_status); + reset(); + } + + void cleanup() { + if (!http_reset_) { + http_reset_ = true; + stream_->reset(); + } + + // This will destroy us, but only do so if we are actually in a list. This does not happen in + // the immediate failure case. + if (LinkedObject>::inserted()) { + LinkedObject>::removeFromList( + parent_.active_streams_); + } + } + + void closeLocal() { + local_closed_ |= true; + if (complete()) { + cleanup(); + } + } + + void closeRemote() { + remote_closed_ |= true; + if (complete()) { + cleanup(); + } + } + + bool complete() const { return local_closed_ && remote_closed_; } + + AsyncClientImpl& parent_; + AsyncClientCallbacks& callbacks_; + bool local_closed_{}; + bool remote_closed_{}; + bool http_reset_{}; + Http::AsyncClient::Stream* stream_{}; + Decoder decoder_; + + friend class AsyncClientImpl; +}; + +} // namespace Grpc +} // namespace Envoy diff --git a/test/common/grpc/BUILD b/test/common/grpc/BUILD index a34964177bdda..0eab7467b8446 100644 --- a/test/common/grpc/BUILD +++ b/test/common/grpc/BUILD @@ -8,6 +8,20 @@ load( envoy_package() +envoy_cc_test( + name = "async_client_impl_test", + srcs = ["async_client_impl_test.cc"], + deps = [ + "//source/common/grpc:async_client_lib", + "//test/mocks/buffer:buffer_mocks", + "//test/mocks/grpc:grpc_mocks", + "//test/mocks/http:http_mocks", + "//test/mocks/upstream:upstream_mocks", + "//test/proto:helloworld_proto", + "//test/test_common:utility_lib", + ], +) + envoy_cc_test( name = "codec_test", srcs = ["codec_test.cc"], diff --git a/test/common/grpc/async_client_impl_test.cc b/test/common/grpc/async_client_impl_test.cc new file mode 100644 index 0000000000000..bca31f788808a --- /dev/null +++ b/test/common/grpc/async_client_impl_test.cc @@ -0,0 +1,366 @@ +#include "common/grpc/async_client_impl.h" + +#include "test/mocks/buffer/mocks.h" +#include "test/mocks/grpc/mocks.h" +#include "test/mocks/http/mocks.h" +#include "test/mocks/upstream/mocks.h" +#include "test/proto/helloworld.pb.h" +#include "test/test_common/utility.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +namespace Envoy { +using testing::_; +using testing::Invoke; +using testing::Eq; +using testing::NiceMock; +using testing::Return; +using testing::ReturnRef; +using testing::Mock; + +namespace Grpc { + +template class AsyncClientImpl; +template class AsyncClientStreamImpl; + +namespace { + +const std::string HELLO_REQUEST = "ABC"; +// We expect the 5 byte header to only have a length of 5 indicating the size of the protobuf. The +// protobuf beings with 0x0a, indicating this is the first field of type string. This is followed +// by 0x03 for the number of characters and the name ABC set above. +const char HELLO_REQUEST_DATA[] = "\x00\x00\x00\x00\x05\x0a\x03\x41\x42\x43"; +const size_t HELLO_REQUEST_SIZE = sizeof(HELLO_REQUEST_DATA) - 1; + +const std::string HELLO_REPLY = "DEFG"; +const char HELLO_REPLY_DATA[] = "\x00\x00\x00\x00\x06\x0a\x04\x44\x45\x46\x47"; +const size_t HELLO_REPLY_SIZE = sizeof(HELLO_REPLY_DATA) - 1; + +MATCHER_P(HelloworldReplyEq, rhs, "") { return arg.message() == rhs; } + +typedef std::vector> TestMetadata; + +class HelloworldStream : public MockAsyncClientCallbacks { +public: + HelloworldStream() { + ON_CALL(http_stream_, reset()).WillByDefault(Invoke([this]() { http_callbacks_->onReset(); })); + } + + ~HelloworldStream() { + if (grpc_stream_ != nullptr) { + EXPECT_CALL(http_stream_, reset()); + grpc_stream_->reset(); + } + } + + void sendRequest() { + helloworld::HelloRequest request; + request.set_name(HELLO_REQUEST); + + EXPECT_CALL( + http_stream_, + sendData(BufferStringEqual(std::string(HELLO_REQUEST_DATA, HELLO_REQUEST_SIZE)), false)); + grpc_stream_->sendMessage(request); + Mock::VerifyAndClearExpectations(&http_stream_); + } + + void sendServerInitialMetadata(TestMetadata& metadata) { + Http::HeaderMapPtr reply_headers{new Http::TestHeaderMapImpl{{":status", "200"}}}; + for (auto& value : metadata) { + reply_headers->addStatic(value.first, value.second); + } + EXPECT_CALL(*this, onReceiveInitialMetadata_(HeaderMapEqualRef(reply_headers.get()))); + http_callbacks_->onHeaders(std::move(reply_headers), false); + } + + void sendReply() { + Buffer::OwnedImpl reply_buffer(HELLO_REPLY_DATA, HELLO_REPLY_SIZE); + + helloworld::HelloReply reply; + reply.set_message(HELLO_REPLY); + EXPECT_CALL(*this, onReceiveMessage_(HelloworldReplyEq(HELLO_REPLY))); + http_callbacks_->onData(reply_buffer, false); + } + + void expectGrpcStatus(Status::GrpcStatus grpc_status) { + if (grpc_status != Status::GrpcStatus::Ok) { + EXPECT_CALL(http_stream_, reset()); + } + EXPECT_CALL(*this, onRemoteClose(grpc_status)) + .WillOnce(Invoke([this](Status::GrpcStatus grpc_status) { + if (grpc_status != Status::GrpcStatus::Ok) { + clearStream(); + } + })); + } + + void sendServerTrailers(Status::GrpcStatus grpc_status, TestMetadata metadata, + bool trailers_only = false) { + Http::HeaderMapPtr reply_trailers{ + new Http::TestHeaderMapImpl{{"grpc-status", std::to_string(enumToInt(grpc_status))}}}; + if (trailers_only) { + reply_trailers->addStatic(Http::LowerCaseString(":status"), "200"); + } + for (auto& value : metadata) { + reply_trailers->addStatic(value.first, value.second); + } + EXPECT_CALL(*this, onReceiveTrailingMetadata_(HeaderMapEqualRef(reply_trailers.get()))); + expectGrpcStatus(grpc_status); + if (trailers_only) { + http_callbacks_->onHeaders(std::move(reply_trailers), true); + } else { + http_callbacks_->onTrailers(std::move(reply_trailers)); + } + } + + void closeStream() { + EXPECT_CALL(http_stream_, reset()); + grpc_stream_->close(); + clearStream(); + } + + void clearStream() { grpc_stream_ = nullptr; } + + Http::AsyncClient::StreamCallbacks* http_callbacks_{}; + Http::MockAsyncClientStream http_stream_; + AsyncClientStream* grpc_stream_{}; +}; + +class GrpcAsyncClientImplTest : public testing::Test { +public: + GrpcAsyncClientImplTest() + : method_descriptor_(helloworld::Greeter::descriptor()->FindMethodByName("SayHello")), + grpc_client_(new AsyncClientImpl( + cm_, "test_cluster")) { + ON_CALL(cm_, httpAsyncClientForCluster("test_cluster")).WillByDefault(ReturnRef(http_client_)); + } + + std::unique_ptr createStream(TestMetadata& initial_metadata) { + std::unique_ptr stream(new HelloworldStream()); + std::vector keys; + EXPECT_CALL(*stream, onCreateInitialMetadata(_)) + .WillOnce(Invoke([&initial_metadata](Http::HeaderMap& headers) { + for (auto& value : initial_metadata) { + headers.addStatic(value.first, value.second); + } + })); + Http::TestHeaderMapImpl headers{{":method", "POST"}, + {":path", "/helloworld.Greeter/SayHello"}, + {":authority", "test_cluster"}, + {"content-type", "application/grpc"}}; + for (auto& value : initial_metadata) { + headers.addStatic(value.first, value.second); + } + EXPECT_CALL(stream->http_stream_, sendHeaders(HeaderMapEqualRef(&headers), _)); + + ON_CALL(http_client_, start(_, _)) + .WillByDefault(Invoke([this, &stream](Http::AsyncClient::StreamCallbacks& callbacks, + const Optional& timeout) { + UNREFERENCED_PARAMETER(timeout); + stream->http_callbacks_ = &callbacks; + return &stream->http_stream_; + })); + stream->grpc_stream_ = + grpc_client_->start(*method_descriptor_, *stream, Optional()); + EXPECT_NE(stream->grpc_stream_, nullptr); + return stream; + } + + const google::protobuf::MethodDescriptor* method_descriptor_; + NiceMock http_client_; + NiceMock cm_; + std::unique_ptr> grpc_client_; +}; + +// Validate that a simple request-reply stream works. +TEST_F(GrpcAsyncClientImplTest, BasicStream) { + TestMetadata empty_metadata; + auto stream = createStream(empty_metadata); + stream->sendRequest(); + stream->sendServerInitialMetadata(empty_metadata); + stream->sendReply(); + stream->sendServerTrailers(Status::GrpcStatus::Ok, empty_metadata); + stream->closeStream(); +} + +// Validate that multiple streams work. +TEST_F(GrpcAsyncClientImplTest, MultiStream) { + TestMetadata empty_metadata; + auto stream_0 = createStream(empty_metadata); + auto stream_1 = createStream(empty_metadata); + stream_0->sendRequest(); + stream_1->sendRequest(); + stream_0->sendServerInitialMetadata(empty_metadata); + stream_0->sendReply(); + stream_1->sendServerTrailers(Status::GrpcStatus::Unavailable, empty_metadata); + stream_0->sendServerTrailers(Status::GrpcStatus::Ok, empty_metadata); + stream_0->closeStream(); +} + +// Validate that a failure in the HTTP client returns immediately with status +// UNAVAILABLE. +TEST_F(GrpcAsyncClientImplTest, HttpStartFail) { + MockAsyncClientCallbacks grpc_callbacks; + ON_CALL(http_client_, start(_, _)).WillByDefault(Return(nullptr)); + EXPECT_CALL(grpc_callbacks, onRemoteClose(Status::GrpcStatus::Unavailable)); + auto* grpc_stream = grpc_client_->start(*method_descriptor_, grpc_callbacks, + Optional()); + EXPECT_EQ(grpc_stream, nullptr); +} + +// Validate that a non-200 HTTP status results in an INTERNAL gRPC error. +TEST_F(GrpcAsyncClientImplTest, HttpNon200Status) { + TestMetadata empty_metadata; + auto stream = createStream(empty_metadata); + Http::HeaderMapPtr reply_headers{new Http::TestHeaderMapImpl{{":status", "404"}}}; + stream->expectGrpcStatus(Status::GrpcStatus::Internal); + stream->http_callbacks_->onHeaders(std::move(reply_headers), false); +} + +// Validate that a HTTP-level reset is handled as an INTERNAL gRPC error. +TEST_F(GrpcAsyncClientImplTest, HttpReset) { + TestMetadata empty_metadata; + auto stream = createStream(empty_metadata); + stream->expectGrpcStatus(Status::GrpcStatus::Internal); + stream->http_callbacks_->onReset(); +} + +// Validate that a reply with bad gRPC framing is handled as an INTERNAL gRPC +// error. +TEST_F(GrpcAsyncClientImplTest, BadReplyGrpcFraming) { + TestMetadata empty_metadata; + auto stream = createStream(empty_metadata); + stream->sendRequest(); + stream->sendServerInitialMetadata(empty_metadata); + stream->expectGrpcStatus(Status::GrpcStatus::Internal); + Buffer::OwnedImpl reply_buffer("\xde\xad\xbe\xef\x00", 5); + stream->http_callbacks_->onData(reply_buffer, false); +} + +// Validate that a reply with bad protobuf is handled as an INTERNAL gRPC error. +TEST_F(GrpcAsyncClientImplTest, BadReplyProtobuf) { + TestMetadata empty_metadata; + auto stream = createStream(empty_metadata); + stream->sendRequest(); + stream->sendServerInitialMetadata(empty_metadata); + stream->expectGrpcStatus(Status::GrpcStatus::Internal); + Buffer::OwnedImpl reply_buffer("\x00\x00\x00\x00\x02\xff\xff", 7); + stream->http_callbacks_->onData(reply_buffer, false); +} + +// Validate that an out-of-range gRPC status is handled as an INVALID_CODE gRPC +// error. +TEST_F(GrpcAsyncClientImplTest, OutOfRangeGrpcStatus) { + TestMetadata empty_metadata; + auto stream = createStream(empty_metadata); + stream->sendServerInitialMetadata(empty_metadata); + stream->sendReply(); + stream->expectGrpcStatus(Status::GrpcStatus::InvalidCode); + Http::HeaderMapPtr reply_trailers{ + new Http::TestHeaderMapImpl{{"grpc-status", std::to_string(0x1337)}}}; + EXPECT_CALL(*stream, onReceiveTrailingMetadata_(HeaderMapEqualRef(reply_trailers.get()))); + stream->http_callbacks_->onTrailers(std::move(reply_trailers)); +} + +// Validate that a missing gRPC status is handled as an INTERNAL gRPC error. +TEST_F(GrpcAsyncClientImplTest, MissingGrpcStatus) { + TestMetadata empty_metadata; + auto stream = createStream(empty_metadata); + stream->sendServerInitialMetadata(empty_metadata); + stream->sendReply(); + stream->expectGrpcStatus(Status::GrpcStatus::Internal); + Http::HeaderMapPtr reply_trailers{new Http::TestHeaderMapImpl{}}; + EXPECT_CALL(*stream, onReceiveTrailingMetadata_(HeaderMapEqualRef(reply_trailers.get()))); + stream->http_callbacks_->onTrailers(std::move(reply_trailers)); +} + +// Validate that a reply terminated without trailers is handled as an INTERNAL +// gRPC error. +TEST_F(GrpcAsyncClientImplTest, ReplyNoTrailers) { + TestMetadata empty_metadata; + auto stream = createStream(empty_metadata); + stream->sendRequest(); + stream->sendServerInitialMetadata(empty_metadata); + stream->expectGrpcStatus(Status::GrpcStatus::Internal); + Buffer::OwnedImpl reply_buffer(HELLO_REPLY_DATA, HELLO_REPLY_SIZE); + helloworld::HelloReply reply; + reply.set_message(HELLO_REPLY); + EXPECT_CALL(*stream, onReceiveMessage_(HelloworldReplyEq(HELLO_REPLY))); + stream->http_callbacks_->onData(reply_buffer, true); +} + +// Validate that send client initial metadata works. +TEST_F(GrpcAsyncClientImplTest, ClientInitialMetadata) { + TestMetadata initial_metadata = { + {Http::LowerCaseString("foo"), "bar"}, {Http::LowerCaseString("baz"), "blah"}, + }; + createStream(initial_metadata); +} + +// Validate that receiving server initial metadata works. +TEST_F(GrpcAsyncClientImplTest, ServerInitialMetadata) { + TestMetadata empty_metadata; + auto stream = createStream(empty_metadata); + stream->sendRequest(); + TestMetadata initial_metadata = { + {Http::LowerCaseString("foo"), "bar"}, {Http::LowerCaseString("baz"), "blah"}, + }; + stream->sendServerInitialMetadata(initial_metadata); +} + +// Validate that receiving server trailing metadata works. +TEST_F(GrpcAsyncClientImplTest, ServerTrailingMetadata) { + TestMetadata empty_metadata; + auto stream = createStream(empty_metadata); + stream->sendRequest(); + stream->sendServerInitialMetadata(empty_metadata); + stream->sendReply(); + TestMetadata trailing_metadata = { + {Http::LowerCaseString("foo"), "bar"}, {Http::LowerCaseString("baz"), "blah"}, + }; + stream->sendServerTrailers(Status::GrpcStatus::Ok, trailing_metadata); +} + +// Validate that a trailers-only response is handled. +TEST_F(GrpcAsyncClientImplTest, TrailersOnly) { + TestMetadata empty_metadata; + auto stream = createStream(empty_metadata); + stream->sendServerTrailers(Status::GrpcStatus::Ok, empty_metadata, true); + stream->closeStream(); +} + +// Validate that a trailers RESOURCE_EXHAUSTED reply is handled. +TEST_F(GrpcAsyncClientImplTest, ResourceExhaustedError) { + TestMetadata empty_metadata; + auto stream = createStream(empty_metadata); + stream->sendServerInitialMetadata(empty_metadata); + stream->sendReply(); + stream->sendServerTrailers(Status::GrpcStatus::ResourceExhausted, empty_metadata); +} + +// Validate that we can continue to receive after a local close. +TEST_F(GrpcAsyncClientImplTest, ReceiveAfterLocalClose) { + TestMetadata empty_metadata; + auto stream = createStream(empty_metadata); + stream->sendRequest(); + stream->closeStream(); + stream->sendServerInitialMetadata(empty_metadata); + stream->sendReply(); + stream->sendServerTrailers(Status::GrpcStatus::Ok, empty_metadata); +} + +// Validate that we can continue to send after a remote close. +TEST_F(GrpcAsyncClientImplTest, SendAfterRemoteClose) { + TestMetadata empty_metadata; + auto stream = createStream(empty_metadata); + stream->sendServerInitialMetadata(empty_metadata); + stream->sendReply(); + stream->sendServerTrailers(Status::GrpcStatus::Ok, empty_metadata); + stream->sendRequest(); + stream->closeStream(); +} + +} // namespace +} // namespace Grpc +} // namespace Envoy diff --git a/test/mocks/grpc/BUILD b/test/mocks/grpc/BUILD index 08552031d52fa..8e98d6e50d93a 100644 --- a/test/mocks/grpc/BUILD +++ b/test/mocks/grpc/BUILD @@ -12,5 +12,8 @@ envoy_cc_mock( name = "grpc_mocks", srcs = ["mocks.cc"], hdrs = ["mocks.h"], - deps = ["//include/envoy/grpc:rpc_channel_interface"], + deps = [ + "//include/envoy/grpc:async_client_interface", + "//include/envoy/grpc:rpc_channel_interface", + ], ) diff --git a/test/mocks/grpc/mocks.h b/test/mocks/grpc/mocks.h index 2c64ef644c772..779bad14f9936 100644 --- a/test/mocks/grpc/mocks.h +++ b/test/mocks/grpc/mocks.h @@ -3,6 +3,7 @@ #include #include +#include "envoy/grpc/async_client.h" #include "envoy/grpc/rpc_channel.h" #include "gmock/gmock.h" @@ -10,6 +11,42 @@ namespace Envoy { namespace Grpc { +template class MockAsyncClientStream : public AsyncClientStream { +public: + MOCK_METHOD1_T(sendMessage, void(const RequestType& request)); + MOCK_METHOD0_T(close, void()); + MOCK_METHOD0_T(reset, void()); +}; + +template +class MockAsyncClientCallbacks : public AsyncClientCallbacks { +public: + void onReceiveInitialMetadata(Http::HeaderMapPtr&& metadata) { + onReceiveInitialMetadata_(*metadata); + } + + void onReceiveMessage(std::unique_ptr&& message) { onReceiveMessage_(*message); } + + void onReceiveTrailingMetadata(Http::HeaderMapPtr&& metadata) { + onReceiveTrailingMetadata_(*metadata); + } + + MOCK_METHOD1_T(onCreateInitialMetadata, void(Http::HeaderMap& metadata)); + MOCK_METHOD1_T(onReceiveInitialMetadata_, void(const Http::HeaderMap& metadata)); + MOCK_METHOD1_T(onReceiveMessage_, void(const ResponseType& message)); + MOCK_METHOD1_T(onReceiveTrailingMetadata_, void(const Http::HeaderMap& metadata)); + MOCK_METHOD1_T(onRemoteClose, void(Status::GrpcStatus status)); +}; + +template +class MockAsyncClient : public AsyncClient { +public: + MOCK_METHOD4_T(start, AsyncClientStream*( + const google::protobuf::MethodDescriptor& service_method, + AsyncClientCallbacks& callbacks, + const Optional& timeout)); +}; + class MockRpcChannelCallbacks : public RpcChannelCallbacks { public: MockRpcChannelCallbacks(); diff --git a/test/mocks/http/mocks.h b/test/mocks/http/mocks.h index c316cfa002b4f..c70e8415ad8b8 100644 --- a/test/mocks/http/mocks.h +++ b/test/mocks/http/mocks.h @@ -388,6 +388,14 @@ class MockAsyncClientRequest : public AsyncClient::Request { MockAsyncClient* client_; }; +class MockAsyncClientStream : public AsyncClient::Stream { +public: + MOCK_METHOD2(sendHeaders, void(HeaderMap& headers, bool end_stream)); + MOCK_METHOD2(sendData, void(Buffer::Instance& data, bool end_stream)); + MOCK_METHOD1(sendTrailers, void(HeaderMap& trailers)); + MOCK_METHOD0(reset, void()); +}; + class MockFilterChainFactoryCallbacks : public Http::FilterChainFactoryCallbacks { public: MockFilterChainFactoryCallbacks(); From d715cae601411fb7ee2137bf086df545446a4fc3 Mon Sep 17 00:00:00 2001 From: Harvey Tuch Date: Mon, 12 Jun 2017 17:10:36 -0400 Subject: [PATCH 02/10] Typo from wattli@. --- test/common/grpc/async_client_impl_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/common/grpc/async_client_impl_test.cc b/test/common/grpc/async_client_impl_test.cc index bca31f788808a..9842ec21883e9 100644 --- a/test/common/grpc/async_client_impl_test.cc +++ b/test/common/grpc/async_client_impl_test.cc @@ -28,7 +28,7 @@ namespace { const std::string HELLO_REQUEST = "ABC"; // We expect the 5 byte header to only have a length of 5 indicating the size of the protobuf. The -// protobuf beings with 0x0a, indicating this is the first field of type string. This is followed +// protobuf begins with 0x0a, indicating this is the first field of type string. This is followed // by 0x03 for the number of characters and the name ABC set above. const char HELLO_REQUEST_DATA[] = "\x00\x00\x00\x00\x05\x0a\x03\x41\x42\x43"; const size_t HELLO_REQUEST_SIZE = sizeof(HELLO_REQUEST_DATA) - 1; From 482b3bb53f6d7eb49b0c6451362ee733ab7010a4 Mon Sep 17 00:00:00 2001 From: Harvey Tuch Date: Tue, 13 Jun 2017 09:30:21 -0400 Subject: [PATCH 03/10] HTTP to gRPC response status code translation. --- source/common/grpc/async_client_impl.h | 14 ++++++--- source/common/grpc/common.cc | 22 +++++++++++++++ source/common/grpc/common.h | 14 +++++++-- test/common/grpc/async_client_impl_test.cc | 33 ++++++++++++++++------ 4 files changed, 68 insertions(+), 15 deletions(-) diff --git a/source/common/grpc/async_client_impl.h b/source/common/grpc/async_client_impl.h index 2bfb3f83faf27..91ebff04adb16 100644 --- a/source/common/grpc/async_client_impl.h +++ b/source/common/grpc/async_client_impl.h @@ -71,8 +71,15 @@ class AsyncClientStreamImpl : public AsyncClientStream, // Http::AsyncClient::StreamCallbacks void onHeaders(Http::HeaderMapPtr&& headers, bool end_stream) override { ASSERT(!remote_closed_); - if (Http::Utility::getResponseStatus(*headers) != enumToInt(Http::Code::OK)) { - streamError(Status::GrpcStatus::Internal); + const auto http_response_status = Http::Utility::getResponseStatus(*headers); + if (http_response_status != enumToInt(Http::Code::OK)) { + // https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md requires that + // grpc-status be used if available. + if (end_stream && Common::getGrpcStatus(*headers).valid()) { + onTrailers(std::move(headers)); + return; + } + streamError(Common::httpToGrpcStatus(http_response_status)); return; } if (end_stream) { @@ -111,7 +118,6 @@ class AsyncClientStreamImpl : public AsyncClientStream, void onTrailers(Http::HeaderMapPtr&& trailers) override { ASSERT(!remote_closed_); - callbacks_.onReceiveTrailingMetadata(std::move(trailers)); const Optional grpc_status = Common::getGrpcStatus(*trailers); if (!grpc_status.valid()) { @@ -122,7 +128,7 @@ class AsyncClientStreamImpl : public AsyncClientStream, streamError(grpc_status.value()); return; } - + callbacks_.onReceiveTrailingMetadata(std::move(trailers)); callbacks_.onRemoteClose(Status::GrpcStatus::Ok); closeRemote(); } diff --git a/source/common/grpc/common.cc b/source/common/grpc/common.cc index 6ab0d6ff6c155..1e5d11679d995 100644 --- a/source/common/grpc/common.cc +++ b/source/common/grpc/common.cc @@ -38,6 +38,28 @@ Optional Common::getGrpcStatus(const Http::HeaderMap& traile return Optional(static_cast(grpc_status_code)); } +Status::GrpcStatus Common::httpToGrpcStatus(uint64_t http_response_status) { + // From + // https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md. + switch (http_response_status) { + case 400: + return Status::GrpcStatus::Internal; + case 401: + return Status::GrpcStatus::Unauthenticated; + case 403: + return Status::GrpcStatus::PermissionDenied; + case 404: + return Status::GrpcStatus::Unimplemented; + case 429: + case 502: + case 503: + case 504: + return Status::GrpcStatus::Unavailable; + default: + return Status::GrpcStatus::Unknown; + } +} + void Common::chargeStat(const Upstream::ClusterInfo& cluster, const std::string& grpc_service, const std::string& grpc_method, bool success) { cluster.statsScope() diff --git a/source/common/grpc/common.h b/source/common/grpc/common.h index 055fbd0617d83..47314eabdd7eb 100644 --- a/source/common/grpc/common.h +++ b/source/common/grpc/common.h @@ -28,11 +28,21 @@ class Common { public: /** * Returns the GrpcStatus code from a given set of headers, if present. - * @headers the headers to parse. - * @returns the parsed status code or InvalidCode if no valid status is found. + * @param headers the headers to parse. + * @return Optional the parsed status code or InvalidCode if no valid status + * is found. */ static Optional getGrpcStatus(const Http::HeaderMap& headers); + /** + * Returns the gRPC status code from a given HTTP response status code. Ordinarily, it is expected + * that a 200 response is provided, but gRPC defines a mapping for intermediaries that are not + * gRPC aware, see https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md. + * @param http_response_status HTTP status code. + * @return Status::GrpcStatus corresponding gRPC status code. + */ + static Status::GrpcStatus httpToGrpcStatus(uint64_t http_response_status); + /** * Charge a success/failure stat to a cluster/service/method. * @param cluster supplies the target cluster. diff --git a/test/common/grpc/async_client_impl_test.cc b/test/common/grpc/async_client_impl_test.cc index 9842ec21883e9..9e04ec2c7e7d8 100644 --- a/test/common/grpc/async_client_impl_test.cc +++ b/test/common/grpc/async_client_impl_test.cc @@ -105,7 +105,9 @@ class HelloworldStream : public MockAsyncClientCallbacks for (auto& value : metadata) { reply_trailers->addStatic(value.first, value.second); } - EXPECT_CALL(*this, onReceiveTrailingMetadata_(HeaderMapEqualRef(reply_trailers.get()))); + if (grpc_status == Status::GrpcStatus::Ok) { + EXPECT_CALL(*this, onReceiveTrailingMetadata_(HeaderMapEqualRef(reply_trailers.get()))); + } expectGrpcStatus(grpc_status); if (trailers_only) { http_callbacks_->onHeaders(std::move(reply_trailers), true); @@ -155,8 +157,8 @@ class GrpcAsyncClientImplTest : public testing::Test { EXPECT_CALL(stream->http_stream_, sendHeaders(HeaderMapEqualRef(&headers), _)); ON_CALL(http_client_, start(_, _)) - .WillByDefault(Invoke([this, &stream](Http::AsyncClient::StreamCallbacks& callbacks, - const Optional& timeout) { + .WillByDefault(Invoke([&stream](Http::AsyncClient::StreamCallbacks& callbacks, + const Optional& timeout) { UNREFERENCED_PARAMETER(timeout); stream->http_callbacks_ = &callbacks; return &stream->http_stream_; @@ -209,13 +211,28 @@ TEST_F(GrpcAsyncClientImplTest, HttpStartFail) { EXPECT_EQ(grpc_stream, nullptr); } -// Validate that a non-200 HTTP status results in an INTERNAL gRPC error. +// Validate that a non-200 HTTP status results in the gRPC error as per +// https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md. TEST_F(GrpcAsyncClientImplTest, HttpNon200Status) { + for (const auto http_response_status : {400, 401, 403, 404, 429, 431}) { + TestMetadata empty_metadata; + auto stream = createStream(empty_metadata); + Http::HeaderMapPtr reply_headers{ + new Http::TestHeaderMapImpl{{":status", std::to_string(http_response_status)}}}; + stream->expectGrpcStatus(Common::httpToGrpcStatus(http_response_status)); + stream->http_callbacks_->onHeaders(std::move(reply_headers), false); + } +} + +// Validate that a non-200 HTTP status results in fallback to grpc-status. +TEST_F(GrpcAsyncClientImplTest, GrpcStatusFallback) { TestMetadata empty_metadata; auto stream = createStream(empty_metadata); - Http::HeaderMapPtr reply_headers{new Http::TestHeaderMapImpl{{":status", "404"}}}; - stream->expectGrpcStatus(Status::GrpcStatus::Internal); - stream->http_callbacks_->onHeaders(std::move(reply_headers), false); + Http::HeaderMapPtr reply_headers{new Http::TestHeaderMapImpl{ + {":status", "404"}, + {"grpc-status", std::to_string(enumToInt(Status::GrpcStatus::PermissionDenied))}}}; + stream->expectGrpcStatus(Status::GrpcStatus::PermissionDenied); + stream->http_callbacks_->onHeaders(std::move(reply_headers), true); } // Validate that a HTTP-level reset is handled as an INTERNAL gRPC error. @@ -259,7 +276,6 @@ TEST_F(GrpcAsyncClientImplTest, OutOfRangeGrpcStatus) { stream->expectGrpcStatus(Status::GrpcStatus::InvalidCode); Http::HeaderMapPtr reply_trailers{ new Http::TestHeaderMapImpl{{"grpc-status", std::to_string(0x1337)}}}; - EXPECT_CALL(*stream, onReceiveTrailingMetadata_(HeaderMapEqualRef(reply_trailers.get()))); stream->http_callbacks_->onTrailers(std::move(reply_trailers)); } @@ -271,7 +287,6 @@ TEST_F(GrpcAsyncClientImplTest, MissingGrpcStatus) { stream->sendReply(); stream->expectGrpcStatus(Status::GrpcStatus::Internal); Http::HeaderMapPtr reply_trailers{new Http::TestHeaderMapImpl{}}; - EXPECT_CALL(*stream, onReceiveTrailingMetadata_(HeaderMapEqualRef(reply_trailers.get()))); stream->http_callbacks_->onTrailers(std::move(reply_trailers)); } From 2a608defc881991c571d43430101e7f6e65674f2 Mon Sep 17 00:00:00 2001 From: Harvey Tuch Date: Tue, 13 Jun 2017 15:26:41 -0400 Subject: [PATCH 04/10] ASAN fixes. --- source/common/grpc/async_client_impl.h | 7 ++-- source/common/grpc/common.cc | 30 +++++++++--------- test/common/grpc/async_client_impl_test.cc | 37 +++++++++++++++++----- 3 files changed, 49 insertions(+), 25 deletions(-) diff --git a/source/common/grpc/async_client_impl.h b/source/common/grpc/async_client_impl.h index 91ebff04adb16..d419073fde02e 100644 --- a/source/common/grpc/async_client_impl.h +++ b/source/common/grpc/async_client_impl.h @@ -149,8 +149,11 @@ class AsyncClientStreamImpl : public AsyncClientStream, void close() override { closeLocal(); } void reset() override { - closeLocal(); - closeRemote(); + // Both closeLocal() and closeRemote() might self-destruct the object. We don't use these below + // to avoid sequencing issues. + local_closed_ |= true; + remote_closed_ |= true; + cleanup(); } void set_stream(Http::AsyncClient::Stream* stream) { stream_ = stream; } diff --git a/source/common/grpc/common.cc b/source/common/grpc/common.cc index 1e5d11679d995..3bf6a107242f2 100644 --- a/source/common/grpc/common.cc +++ b/source/common/grpc/common.cc @@ -42,21 +42,21 @@ Status::GrpcStatus Common::httpToGrpcStatus(uint64_t http_response_status) { // From // https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md. switch (http_response_status) { - case 400: - return Status::GrpcStatus::Internal; - case 401: - return Status::GrpcStatus::Unauthenticated; - case 403: - return Status::GrpcStatus::PermissionDenied; - case 404: - return Status::GrpcStatus::Unimplemented; - case 429: - case 502: - case 503: - case 504: - return Status::GrpcStatus::Unavailable; - default: - return Status::GrpcStatus::Unknown; + case 400: + return Status::GrpcStatus::Internal; + case 401: + return Status::GrpcStatus::Unauthenticated; + case 403: + return Status::GrpcStatus::PermissionDenied; + case 404: + return Status::GrpcStatus::Unimplemented; + case 429: + case 502: + case 503: + case 504: + return Status::GrpcStatus::Unavailable; + default: + return Status::GrpcStatus::Unknown; } } diff --git a/test/common/grpc/async_client_impl_test.cc b/test/common/grpc/async_client_impl_test.cc index 9e04ec2c7e7d8..4d80457069779 100644 --- a/test/common/grpc/async_client_impl_test.cc +++ b/test/common/grpc/async_client_impl_test.cc @@ -97,22 +97,23 @@ class HelloworldStream : public MockAsyncClientCallbacks void sendServerTrailers(Status::GrpcStatus grpc_status, TestMetadata metadata, bool trailers_only = false) { - Http::HeaderMapPtr reply_trailers{ - new Http::TestHeaderMapImpl{{"grpc-status", std::to_string(enumToInt(grpc_status))}}}; + auto* reply_trailers = + new Http::TestHeaderMapImpl{{"grpc-status", std::to_string(enumToInt(grpc_status))}}; if (trailers_only) { - reply_trailers->addStatic(Http::LowerCaseString(":status"), "200"); + reply_trailers->addViaCopy(":status", "200"); } - for (auto& value : metadata) { - reply_trailers->addStatic(value.first, value.second); + for (const auto& value : metadata) { + reply_trailers->addViaCopy(value.first, value.second); } + Http::HeaderMapPtr reply_trailers_ptr{reply_trailers}; if (grpc_status == Status::GrpcStatus::Ok) { - EXPECT_CALL(*this, onReceiveTrailingMetadata_(HeaderMapEqualRef(reply_trailers.get()))); + EXPECT_CALL(*this, onReceiveTrailingMetadata_(HeaderMapEqualRef(reply_trailers))); } expectGrpcStatus(grpc_status); if (trailers_only) { - http_callbacks_->onHeaders(std::move(reply_trailers), true); + http_callbacks_->onHeaders(std::move(reply_trailers_ptr), true); } else { - http_callbacks_->onTrailers(std::move(reply_trailers)); + http_callbacks_->onTrailers(std::move(reply_trailers_ptr)); } } @@ -376,6 +377,26 @@ TEST_F(GrpcAsyncClientImplTest, SendAfterRemoteClose) { stream->closeStream(); } +// Validate that reset() doesn't explode on a half-closed stream (local). +TEST_F(GrpcAsyncClientImplTest, resetAfterCloseLocal) { + TestMetadata empty_metadata; + auto stream = createStream(empty_metadata); + stream->grpc_stream_->close(); + EXPECT_CALL(stream->http_stream_, reset()); + stream->grpc_stream_->reset(); + stream->clearStream(); +} + +// Validate that reset() doesn't explode on a half-closed stream (remote). +TEST_F(GrpcAsyncClientImplTest, resetAfterCloseRemote) { + TestMetadata empty_metadata; + auto stream = createStream(empty_metadata); + stream->sendServerTrailers(Status::GrpcStatus::Ok, empty_metadata, true); + EXPECT_CALL(stream->http_stream_, reset()); + stream->grpc_stream_->reset(); + stream->clearStream(); +} + } // namespace } // namespace Grpc } // namespace Envoy From 36b0c68c40555aebdfc110a56981cf612900bd7a Mon Sep 17 00:00:00 2001 From: Harvey Tuch Date: Tue, 13 Jun 2017 16:55:53 -0400 Subject: [PATCH 05/10] Deprecation for RpcChannel[Impl]. --- include/envoy/grpc/rpc_channel.h | 1 + source/common/grpc/rpc_channel_impl.h | 1 + 2 files changed, 2 insertions(+) diff --git a/include/envoy/grpc/rpc_channel.h b/include/envoy/grpc/rpc_channel.h index 07bee12f15214..4d13634d5dd6e 100644 --- a/include/envoy/grpc/rpc_channel.h +++ b/include/envoy/grpc/rpc_channel.h @@ -48,6 +48,7 @@ class RpcChannelCallbacks { * can accept a RequestCallbacks object. An RpcChannel should be passed to the constructor of an RPC * stub generated via protoc using the "option cc_generic_services = true;" option. It can be used * for multiple service calls, but not concurrently. + * DEPRECATED: See https://github.com/lyft/envoy/issues/1102 */ class RpcChannel : public proto::RpcChannel { public: diff --git a/source/common/grpc/rpc_channel_impl.h b/source/common/grpc/rpc_channel_impl.h index 41884e31c065b..f6678e25ac1a2 100644 --- a/source/common/grpc/rpc_channel_impl.h +++ b/source/common/grpc/rpc_channel_impl.h @@ -25,6 +25,7 @@ namespace Grpc { * needed. * 4) Inflight RPCs can be safely cancelled using cancel(). * 5) See GrpcRequestImplTest for an example. + * DEPRECATED: See https://github.com/lyft/envoy/issues/1102 */ class RpcChannelImpl : public RpcChannel, public Http::AsyncClient::Callbacks { public: From 52c0985b8b2be715440e93481573095beec07e64 Mon Sep 17 00:00:00 2001 From: Harvey Tuch Date: Tue, 13 Jun 2017 17:34:34 -0400 Subject: [PATCH 06/10] Handle sendHeaders() failure in AsyncClient start(). --- source/common/grpc/async_client_impl.h | 7 +++- test/common/grpc/async_client_impl_test.cc | 39 ++++++++++++++++++---- 2 files changed, 39 insertions(+), 7 deletions(-) diff --git a/source/common/grpc/async_client_impl.h b/source/common/grpc/async_client_impl.h index d419073fde02e..5934c06b35d8b 100644 --- a/source/common/grpc/async_client_impl.h +++ b/source/common/grpc/async_client_impl.h @@ -46,8 +46,12 @@ class AsyncClientImpl final : public AsyncClient { callbacks.onCreateInitialMetadata(message->headers()); http_stream->sendHeaders(message->headers(), false); - grpc_stream->moveIntoList(std::move(grpc_stream), active_streams_); + // If sendHeaders() caused a reset, onRemoteClose() has been called inline and we should bail. + if (grpc_stream->http_reset_) { + return nullptr; + } + grpc_stream->moveIntoList(std::move(grpc_stream), active_streams_); return active_streams_.front().get(); } @@ -138,6 +142,7 @@ class AsyncClientStreamImpl : public AsyncClientStream, return; } + http_reset_ = true; streamError(Status::GrpcStatus::Internal); } diff --git a/test/common/grpc/async_client_impl_test.cc b/test/common/grpc/async_client_impl_test.cc index 4d80457069779..bba32f184d287 100644 --- a/test/common/grpc/async_client_impl_test.cc +++ b/test/common/grpc/async_client_impl_test.cc @@ -155,15 +155,14 @@ class GrpcAsyncClientImplTest : public testing::Test { for (auto& value : initial_metadata) { headers.addStatic(value.first, value.second); } - EXPECT_CALL(stream->http_stream_, sendHeaders(HeaderMapEqualRef(&headers), _)); - - ON_CALL(http_client_, start(_, _)) - .WillByDefault(Invoke([&stream](Http::AsyncClient::StreamCallbacks& callbacks, - const Optional& timeout) { + EXPECT_CALL(http_client_, start(_, _)) + .WillOnce(Invoke([&stream](Http::AsyncClient::StreamCallbacks& callbacks, + const Optional& timeout) { UNREFERENCED_PARAMETER(timeout); stream->http_callbacks_ = &callbacks; return &stream->http_stream_; })); + EXPECT_CALL(stream->http_stream_, sendHeaders(HeaderMapEqualRef(&headers), _)); stream->grpc_stream_ = grpc_client_->start(*method_descriptor_, *stream, Optional()); EXPECT_NE(stream->grpc_stream_, nullptr); @@ -212,6 +211,33 @@ TEST_F(GrpcAsyncClientImplTest, HttpStartFail) { EXPECT_EQ(grpc_stream, nullptr); } +// Validate that a failure to sendHeaders() in the HTTP client returns +// immediately with status INTERNAL. +TEST_F(GrpcAsyncClientImplTest, HttpSendHeadersFail) { + MockAsyncClientCallbacks grpc_callbacks; + Http::AsyncClient::StreamCallbacks* http_callbacks; + Http::MockAsyncClientStream http_stream; + EXPECT_CALL(http_client_, start(_, _)) + .WillOnce(Invoke( + [&http_callbacks, &http_stream](Http::AsyncClient::StreamCallbacks& callbacks, + const Optional& timeout) { + UNREFERENCED_PARAMETER(timeout); + http_callbacks = &callbacks; + return &http_stream; + })); + EXPECT_CALL(grpc_callbacks, onCreateInitialMetadata(_)); + EXPECT_CALL(http_stream, sendHeaders(_, _)) + .WillOnce(Invoke([&http_callbacks](Http::HeaderMap& headers, bool end_stream) { + UNREFERENCED_PARAMETER(headers); + UNREFERENCED_PARAMETER(end_stream); + http_callbacks->onReset(); + })); + EXPECT_CALL(grpc_callbacks, onRemoteClose(Status::GrpcStatus::Internal)); + auto* grpc_stream = grpc_client_->start(*method_descriptor_, grpc_callbacks, + Optional()); + EXPECT_EQ(grpc_stream, nullptr); +} + // Validate that a non-200 HTTP status results in the gRPC error as per // https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md. TEST_F(GrpcAsyncClientImplTest, HttpNon200Status) { @@ -240,8 +266,9 @@ TEST_F(GrpcAsyncClientImplTest, GrpcStatusFallback) { TEST_F(GrpcAsyncClientImplTest, HttpReset) { TestMetadata empty_metadata; auto stream = createStream(empty_metadata); - stream->expectGrpcStatus(Status::GrpcStatus::Internal); + EXPECT_CALL(*stream, onRemoteClose(Status::GrpcStatus::Internal)); stream->http_callbacks_->onReset(); + stream->clearStream(); } // Validate that a reply with bad gRPC framing is handled as an INTERNAL gRPC From 5ac189607a9aa768cf457557059094c5c1e7bf98 Mon Sep 17 00:00:00 2001 From: Harvey Tuch Date: Tue, 13 Jun 2017 17:37:30 -0400 Subject: [PATCH 07/10] Avoid reallocating onData frames vector. --- source/common/grpc/async_client_impl.h | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/source/common/grpc/async_client_impl.h b/source/common/grpc/async_client_impl.h index 5934c06b35d8b..cb38728b26148 100644 --- a/source/common/grpc/async_client_impl.h +++ b/source/common/grpc/async_client_impl.h @@ -95,13 +95,13 @@ class AsyncClientStreamImpl : public AsyncClientStream, void onData(Buffer::Instance& data, bool end_stream) override { ASSERT(!remote_closed_); - std::vector frames; - if (!decoder_.decode(data, frames)) { + decoded_frames_.clear(); + if (!decoder_.decode(data, decoded_frames_)) { streamError(Status::GrpcStatus::Internal); return; } - for (const auto& frame : frames) { + for (const auto& frame : decoded_frames_) { std::unique_ptr response(new ResponseType()); // TODO(htuch): We can avoid linearizing the buffer here when Buffer::Instance implements // protobuf ZeroCopyInputStream. @@ -206,6 +206,8 @@ class AsyncClientStreamImpl : public AsyncClientStream, bool http_reset_{}; Http::AsyncClient::Stream* stream_{}; Decoder decoder_; + // This is a member to avoid reallocation on every onData(). + std::vector decoded_frames_; friend class AsyncClientImpl; }; From d21b76b962238a8f9d6bbdb6bea0db98225f7eb0 Mon Sep 17 00:00:00 2001 From: Harvey Tuch Date: Tue, 13 Jun 2017 17:38:21 -0400 Subject: [PATCH 08/10] Handle end_stream early in onData(). --- source/common/grpc/async_client_impl.h | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/source/common/grpc/async_client_impl.h b/source/common/grpc/async_client_impl.h index cb38728b26148..50a115ce24ce0 100644 --- a/source/common/grpc/async_client_impl.h +++ b/source/common/grpc/async_client_impl.h @@ -95,6 +95,11 @@ class AsyncClientStreamImpl : public AsyncClientStream, void onData(Buffer::Instance& data, bool end_stream) override { ASSERT(!remote_closed_); + if (end_stream) { + streamError(Status::GrpcStatus::Internal); + return; + } + decoded_frames_.clear(); if (!decoder_.decode(data, decoded_frames_)) { streamError(Status::GrpcStatus::Internal); @@ -114,10 +119,6 @@ class AsyncClientStreamImpl : public AsyncClientStream, } callbacks_.onReceiveMessage(std::move(response)); } - if (end_stream) { - streamError(Status::GrpcStatus::Internal); - return; - } } void onTrailers(Http::HeaderMapPtr&& trailers) override { From 211836a3d0b580100a1ff4198d39f9107a3b9fc4 Mon Sep 17 00:00:00 2001 From: Harvey Tuch Date: Tue, 13 Jun 2017 17:39:34 -0400 Subject: [PATCH 09/10] Move using statements outside Envoy namespace. --- test/common/grpc/async_client_impl_test.cc | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/test/common/grpc/async_client_impl_test.cc b/test/common/grpc/async_client_impl_test.cc index bba32f184d287..76daf0600d332 100644 --- a/test/common/grpc/async_client_impl_test.cc +++ b/test/common/grpc/async_client_impl_test.cc @@ -10,7 +10,6 @@ #include "gmock/gmock.h" #include "gtest/gtest.h" -namespace Envoy { using testing::_; using testing::Invoke; using testing::Eq; @@ -19,6 +18,7 @@ using testing::Return; using testing::ReturnRef; using testing::Mock; +namespace Envoy { namespace Grpc { template class AsyncClientImpl; @@ -329,7 +329,6 @@ TEST_F(GrpcAsyncClientImplTest, ReplyNoTrailers) { Buffer::OwnedImpl reply_buffer(HELLO_REPLY_DATA, HELLO_REPLY_SIZE); helloworld::HelloReply reply; reply.set_message(HELLO_REPLY); - EXPECT_CALL(*stream, onReceiveMessage_(HelloworldReplyEq(HELLO_REPLY))); stream->http_callbacks_->onData(reply_buffer, true); } From a43d5c668bdb07abadedc37781a47db13ca02c18 Mon Sep 17 00:00:00 2001 From: Harvey Tuch Date: Tue, 13 Jun 2017 17:45:25 -0400 Subject: [PATCH 10/10] Constructor/destructor in .cc for better gmock compile performance. --- test/mocks/http/mocks.cc | 3 +++ test/mocks/http/mocks.h | 3 +++ 2 files changed, 6 insertions(+) diff --git a/test/mocks/http/mocks.cc b/test/mocks/http/mocks.cc index 2b56c660d3cde..77e63cdd10259 100644 --- a/test/mocks/http/mocks.cc +++ b/test/mocks/http/mocks.cc @@ -131,6 +131,9 @@ MockAsyncClientStreamCallbacks::~MockAsyncClientStreamCallbacks() {} MockAsyncClientRequest::MockAsyncClientRequest(MockAsyncClient* client) : client_(client) {} MockAsyncClientRequest::~MockAsyncClientRequest() { client_->onRequestDestroy(); } +MockAsyncClientStream::MockAsyncClientStream() {} +MockAsyncClientStream::~MockAsyncClientStream() {} + MockFilterChainFactoryCallbacks::MockFilterChainFactoryCallbacks() {} MockFilterChainFactoryCallbacks::~MockFilterChainFactoryCallbacks() {} diff --git a/test/mocks/http/mocks.h b/test/mocks/http/mocks.h index c70e8415ad8b8..797de126117fb 100644 --- a/test/mocks/http/mocks.h +++ b/test/mocks/http/mocks.h @@ -390,6 +390,9 @@ class MockAsyncClientRequest : public AsyncClient::Request { class MockAsyncClientStream : public AsyncClient::Stream { public: + MockAsyncClientStream(); + ~MockAsyncClientStream(); + MOCK_METHOD2(sendHeaders, void(HeaderMap& headers, bool end_stream)); MOCK_METHOD2(sendData, void(Buffer::Instance& data, bool end_stream)); MOCK_METHOD1(sendTrailers, void(HeaderMap& trailers));