From 42d4dcb45efdcd4ab121a846e332da9489d1b4e5 Mon Sep 17 00:00:00 2001 From: John Plevyak Date: Sat, 27 Apr 2019 10:44:12 -0700 Subject: [PATCH 01/17] Add utilities for converting between grpc::ByteBuffer and Buffer::Instance and for adding gRPC frame headers to Buffer::Instances. Signed-off-by: John Plevyak --- source/common/grpc/BUILD | 8 +- source/common/grpc/async_client_impl.cc | 2 +- source/common/grpc/common.cc | 112 +++++++++++++++++- source/common/grpc/common.h | 31 ++++- source/common/upstream/health_checker_impl.cc | 2 +- .../lightstep/lightstep_tracer_impl.cc | 2 +- test/common/grpc/common_test.cc | 75 ++++++++++++ .../grpc/grpc_client_integration_test.cc | 2 +- .../upstream/health_checker_impl_test.cc | 4 +- .../grpc_json_transcoder_integration_test.cc | 2 +- .../json_transcoder_filter_test.cc | 14 +-- .../lightstep/lightstep_tracer_impl_test.cc | 2 +- test/integration/fake_upstream.h | 2 +- 13 files changed, 238 insertions(+), 20 deletions(-) diff --git a/source/common/grpc/BUILD b/source/common/grpc/BUILD index f0471c3568ee3..1acb3d414fd40 100644 --- a/source/common/grpc/BUILD +++ b/source/common/grpc/BUILD @@ -28,6 +28,7 @@ envoy_cc_library( hdrs = ["async_client_manager_impl.h"], deps = [ ":async_client_lib", + ":common_lib", "//include/envoy/grpc:async_client_manager_interface", "//include/envoy/singleton:manager_interface", "//include/envoy/thread_local:thread_local_interface", @@ -61,7 +62,10 @@ envoy_cc_library( name = "common_lib", srcs = ["common.cc"], hdrs = ["common.h"], - external_deps = ["abseil_optional"], + external_deps = [ + "abseil_optional", + "grpc", + ], deps = [ "//include/envoy/http:header_map_interface", "//include/envoy/http:message_interface", @@ -91,6 +95,8 @@ envoy_cc_library( "grpc", ], deps = [ + ":async_client_lib", + ":common_lib", ":google_grpc_creds_lib", "//include/envoy/api:api_interface", "//include/envoy/grpc:google_grpc_creds_interface", diff --git a/source/common/grpc/async_client_impl.cc b/source/common/grpc/async_client_impl.cc index c48a95a8771e9..443e5c943c248 100644 --- a/source/common/grpc/async_client_impl.cc +++ b/source/common/grpc/async_client_impl.cc @@ -177,7 +177,7 @@ void AsyncStreamImpl::onReset() { } void AsyncStreamImpl::sendMessage(const Protobuf::Message& request, bool end_stream) { - stream_->sendData(*Common::serializeBody(request), end_stream); + stream_->sendData(*Common::serializeToGrpcFrame(request), end_stream); } void AsyncStreamImpl::closeStream() { diff --git a/source/common/grpc/common.cc b/source/common/grpc/common.cc index ee123d02ff01d..0065e988592e8 100644 --- a/source/common/grpc/common.cc +++ b/source/common/grpc/common.cc @@ -2,6 +2,7 @@ #include +#include #include #include #include @@ -12,6 +13,7 @@ #include "common/common/enum_to_int.h" #include "common/common/fmt.h" #include "common/common/macros.h" +#include "common/common/stack_array.h" #include "common/common/utility.h" #include "common/http/headers.h" #include "common/http/message_impl.h" @@ -111,7 +113,7 @@ bool Common::resolveServiceAndMethod(const Http::HeaderEntry* path, std::string* return true; } -Buffer::InstancePtr Common::serializeBody(const Protobuf::Message& message) { +Buffer::InstancePtr Common::serializeToGrpcFrame(const Protobuf::Message& message) { // http://www.grpc.io/docs/guides/wire.html // Reserve enough space for the entire message and the 5 byte header. Buffer::InstancePtr body(new Buffer::OwnedImpl()); @@ -133,6 +135,21 @@ Buffer::InstancePtr Common::serializeBody(const Protobuf::Message& message) { return body; } +Buffer::InstancePtr Common::serializeMessage(const Protobuf::Message& message) { + Buffer::InstancePtr body(new Buffer::OwnedImpl()); + const uint32_t size = message.ByteSize(); + Buffer::RawSlice iovec; + body->reserve(size, &iovec, 1); + ASSERT(iovec.len_ >= size); + iovec.len_ = size; + uint8_t* current = reinterpret_cast(iovec.mem_); + Protobuf::io::ArrayOutputStream stream(current, size, -1); + Protobuf::io::CodedOutputStream codec_stream(&stream); + message.SerializeWithCachedSizes(&codec_stream); + body->commit(&iovec, 1); + return body; +} + std::chrono::milliseconds Common::getGrpcTimeout(Http::HeaderMap& request_headers) { std::chrono::milliseconds timeout(0); Http::HeaderEntry* header_grpc_timeout_entry = request_headers.GrpcTimeout(); @@ -269,5 +286,98 @@ std::string Common::typeUrl(const std::string& qualified_name) { return typeUrlPrefix() + "/" + qualified_name; } +struct BufferInstanceContainer { + BufferInstanceContainer(int ref_count, Buffer::InstancePtr buffer) + : ref_count_(ref_count), buffer_(std::move(buffer)) {} + std::atomic ref_count_; + Buffer::InstancePtr buffer_; +}; + +static void derefBufferInstanceContainer(void* container_ptr) { + auto container = reinterpret_cast(container_ptr); + container->ref_count_--; + if (container->ref_count_ <= 0) { + delete container; + } +} + +grpc::ByteBuffer Common::makeByteBuffer(Buffer::InstancePtr bufferInstance) { + if (!bufferInstance) { + return {}; + } + Buffer::RawSlice oneRawSlice; + // NB: we need to pass in >= 1 in order to get the real "n" (see Buffer::Instance for details). + int nSlices = bufferInstance->getRawSlices(&oneRawSlice, 1); + if (nSlices <= 0) { + return {}; + } + auto container = new BufferInstanceContainer{nSlices, std::move(bufferInstance)}; + if (nSlices == 1) { + grpc::Slice oneSlice(oneRawSlice.mem_, oneRawSlice.len_, &derefBufferInstanceContainer, + container); + return {&oneSlice, 1}; + } + STACK_ARRAY(manyRawSlices, Buffer::RawSlice, nSlices); + bufferInstance->getRawSlices(manyRawSlices.begin(), nSlices); + std::vector slices; + slices.reserve(nSlices); + for (int i = 0; i < nSlices; i++) { + slices.emplace_back(manyRawSlices[i].mem_, manyRawSlices[i].len_, &derefBufferInstanceContainer, + container); + } + return {&slices[0], slices.size()}; +} + +struct ByteBufferContainer { + ByteBufferContainer(int ref_count) : ref_count_(ref_count) {} + ~ByteBufferContainer() { ::free(fragments); } + std::atomic ref_count_; + Buffer::BufferFragmentImpl* fragments = nullptr; + std::vector slices_; +}; + +Buffer::InstancePtr Common::makeBufferInstance(const grpc::ByteBuffer& byteBuffer) { + auto buffer = std::make_unique(); + if (byteBuffer.Length() == 0) { + return buffer; + } + // NB: ByteBuffer::Dump moves the data out of the ByteBuffer so we need to ensure that the + // lifetime of the Slice(s) exceeds our Buffer::Instance. + std::vector slices; + byteBuffer.Dump(&slices); + if (slices.size() == 0) { + return buffer; + } + auto container = new ByteBufferContainer(static_cast(slices.size())); + std::function releaser = + [container](const void*, size_t, const Buffer::BufferFragmentImpl*) { + container->ref_count_--; + if (container->ref_count_ <= 0) { + delete container; + } + }; + // NB: addBufferFragment takes a pointer alias to the BufferFragmentImpl which is passed in so we + // need to ensure that the lifetime of those objects exceeds that of the Buffer::Instance. + container->fragments = static_cast( + ::malloc(sizeof(Buffer::BufferFragmentImpl) * slices.size())); + for (size_t i = 0; i < slices.size(); i++) { + new (&container->fragments[i]) + Buffer::BufferFragmentImpl(slices[i].begin(), slices[i].size(), releaser); + } + for (size_t i = 0; i < slices.size(); i++) { + buffer->addBufferFragment(container->fragments[i]); + } + container->slices_ = std::move(slices); + return buffer; +} + +void Common::PrependGrpcFrameHeader(Buffer::Instance* buffer) { + char header[5]; + header[0] = 0; // flags + const uint32_t nsize = htonl(buffer->length()); + std::memcpy(&header[1], reinterpret_cast(&nsize), sizeof(uint32_t)); + buffer->prepend(absl::string_view(header, 5)); +} + } // namespace Grpc } // namespace Envoy diff --git a/source/common/grpc/common.h b/source/common/grpc/common.h index 7a7399c1679c0..b617792fecba9 100644 --- a/source/common/grpc/common.h +++ b/source/common/grpc/common.h @@ -13,6 +13,7 @@ #include "common/protobuf/protobuf.h" #include "absl/types/optional.h" +#include "grpcpp/grpcpp.h" namespace Envoy { namespace Grpc { @@ -119,9 +120,14 @@ class Common { std::string* method); /** - * Serialize protobuf message. + * Serialize protobuf message. With grpc header. */ - static Buffer::InstancePtr serializeBody(const Protobuf::Message& message); + static Buffer::InstancePtr serializeToGrpcFrame(const Protobuf::Message& message); + + /** + * Serialize protobuf message. Without grpc header. + */ + static Buffer::InstancePtr serializeMessage(const Protobuf::Message& message); /** * Prepare headers for protobuf service. @@ -148,6 +154,27 @@ class Common { */ static std::string typeUrl(const std::string& qualified_name); + /** + * BUild grpc::ByteBuffer which aliases the data in a Buffer::InstancePtr. + * @param bufferInstance source data container. + * @return byteBuffer target container aliased to the data in Buffer::Instance and owning the + * Buffer::Instance. + */ + static grpc::ByteBuffer makeByteBuffer(Buffer::InstancePtr bufferInstance); + + /** + * Build Buffer::Instance which aliases the data in a grpc::ByteBuffer. + * @param byteBuffer source data container. + * @param Buffer::InstancePtr target container aliased to the data in grpc::ByteBuffer. + */ + static Buffer::InstancePtr makeBufferInstance(const grpc::ByteBuffer& byteBuffer); + + /** + * Prepend a gRPC frame header to a Buffer::Instance containing a single gRPC frame. + * @param bufferInstance containing the frame data which will be modified. + */ + static void PrependGrpcFrameHeader(Buffer::Instance* buffer); + private: static void checkForHeaderOnlyError(Http::Message& http_response); }; diff --git a/source/common/upstream/health_checker_impl.cc b/source/common/upstream/health_checker_impl.cc index 719975959a3e6..269aae2430779 100644 --- a/source/common/upstream/health_checker_impl.cc +++ b/source/common/upstream/health_checker_impl.cc @@ -567,7 +567,7 @@ void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::onInterval() { request.set_service(parent_.service_name_.value()); } - request_encoder_->encodeData(*Grpc::Common::serializeBody(request), true); + request_encoder_->encodeData(*Grpc::Common::serializeToGrpcFrame(request), true); } void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::onResetStream(Http::StreamResetReason, diff --git a/source/extensions/tracers/lightstep/lightstep_tracer_impl.cc b/source/extensions/tracers/lightstep/lightstep_tracer_impl.cc index d1aa06dd93ee8..b604f83b4dda0 100644 --- a/source/extensions/tracers/lightstep/lightstep_tracer_impl.cc +++ b/source/extensions/tracers/lightstep/lightstep_tracer_impl.cc @@ -64,7 +64,7 @@ void LightStepDriver::LightStepTransporter::Send(const Protobuf::Message& reques Http::MessagePtr message = Grpc::Common::prepareHeaders( driver_.cluster()->name(), lightstep::CollectorServiceFullName(), lightstep::CollectorMethodName(), absl::optional(timeout)); - message->body() = Grpc::Common::serializeBody(request); + message->body() = Grpc::Common::serializeToGrpcFrame(request); active_request_ = driver_.clusterManager() diff --git a/test/common/grpc/common_test.cc b/test/common/grpc/common_test.cc index 290328a53941a..744c24fb8aec8 100644 --- a/test/common/grpc/common_test.cc +++ b/test/common/grpc/common_test.cc @@ -1,3 +1,5 @@ +#include + #include "common/grpc/common.h" #include "common/http/headers.h" #include "common/http/message_impl.h" @@ -349,5 +351,78 @@ TEST(GrpcCommonTest, ValidateResponse) { } } +TEST(GrpcCommonTest, MakeBufferInstanceEmpty) { + grpc::ByteBuffer byteBuffer; + Common::makeBufferInstance(byteBuffer); +} + +TEST(GrpcCommonTest, MakeByteBufferEmpty) { + auto buffer = std::make_unique(); + Common::makeByteBuffer(std::move(buffer)); +} + +TEST(GrpcCommonTest, MakeBufferInstance1) { + grpc::Slice slice("test"); + grpc::ByteBuffer byteBuffer(&slice, 1); + auto bufferInstance = Common::makeBufferInstance(byteBuffer); + EXPECT_EQ(bufferInstance->toString(), "test"); +} + +TEST(GrpcCommonTest, MakeBufferInstance3) { + grpc::Slice slices[3] = {{"test"}, {" "}, {"this"}}; + grpc::ByteBuffer byteBuffer(slices, 3); + auto bufferInstance = Common::makeBufferInstance(byteBuffer); + EXPECT_EQ(bufferInstance->toString(), "test this"); +} + +TEST(GrpcCommonTest, MakeByteBuffer1) { + auto buffer = std::make_unique(); + buffer->add("test", 4); + auto byteBuffer = Common::makeByteBuffer(std::move(buffer)); + std::vector slices; + byteBuffer.Dump(&slices); + std::string str; + for (auto& s : slices) { + str.append(std::string(reinterpret_cast(s.begin()), s.size())); + } + EXPECT_EQ(str, "test"); +} + +TEST(GrpcCommonTest, MakeByteBuffer3) { + auto buffer = std::make_unique(); + buffer->add("test", 4); + buffer->add(" ", 1); + buffer->add("this", 4); + auto byteBuffer = Common::makeByteBuffer(std::move(buffer)); + std::vector slices; + byteBuffer.Dump(&slices); + std::string str; + for (auto& s : slices) { + str.append(std::string(reinterpret_cast(s.begin()), s.size())); + } + EXPECT_EQ(str, "test this"); +} + +TEST(GrpcCommonTest, ByteBufferInstanceRoundTrip) { + grpc::Slice slices[3] = {{"test"}, {" "}, {"this"}}; + grpc::ByteBuffer byteBuffer1(slices, 3); + auto bufferInstance1 = Common::makeBufferInstance(byteBuffer1); + auto byteBuffer2 = Common::makeByteBuffer(std::move(bufferInstance1)); + auto bufferInstance2 = Common::makeBufferInstance(byteBuffer2); + EXPECT_EQ(bufferInstance2->toString(), "test this"); +} + +TEST(GrpcCommonTest, PreependGrpcFrameHeader) { + auto buffer = std::make_unique(); + buffer->add("test", 4); + char expected_header[5]; + expected_header[0] = 0; // flags + const uint32_t nsize = htonl(4); + std::memcpy(&expected_header[1], reinterpret_cast(&nsize), sizeof(uint32_t)); + std::string header_string(expected_header, 5); + Common::PrependGrpcFrameHeader(buffer.get()); + EXPECT_EQ(buffer->toString(), header_string + "test"); +} + } // namespace Grpc } // namespace Envoy diff --git a/test/common/grpc/grpc_client_integration_test.cc b/test/common/grpc/grpc_client_integration_test.cc index d7d85eb45ffc6..5a222b571cedc 100644 --- a/test/common/grpc/grpc_client_integration_test.cc +++ b/test/common/grpc/grpc_client_integration_test.cc @@ -188,7 +188,7 @@ TEST_P(GrpcClientIntegrationTest, ReplyNoTrailers) { dispatcher_helper_.setStreamEventPending(); stream->expectTrailingMetadata(empty_metadata_); stream->expectGrpcStatus(Status::GrpcStatus::InvalidCode); - auto serialized_response = Grpc::Common::serializeBody(reply); + auto serialized_response = Grpc::Common::serializeToGrpcFrame(reply); stream->fake_stream_->encodeData(*serialized_response, true); stream->fake_stream_->encodeResetStream(); dispatcher_helper_.runDispatcher(); diff --git a/test/common/upstream/health_checker_impl_test.cc b/test/common/upstream/health_checker_impl_test.cc index 0e8d56ca695bc..f279752f07e66 100644 --- a/test/common/upstream/health_checker_impl_test.cc +++ b/test/common/upstream/health_checker_impl_test.cc @@ -2608,7 +2608,7 @@ class GrpcHealthCheckerImplTestBase { serializeResponse(grpc::health::v1::HealthCheckResponse::ServingStatus status) { grpc::health::v1::HealthCheckResponse response; response.set_status(status); - const auto data = Grpc::Common::serializeBody(response); + const auto data = Grpc::Common::serializeToGrpcFrame(response); auto ret = std::vector(data->length(), 0); data->copyOut(0, data->length(), &ret[0]); return ret; @@ -2925,7 +2925,7 @@ TEST_F(GrpcHealthCheckerImplTest, SuccessResponseSplitBetweenChunks) { grpc::health::v1::HealthCheckResponse response; response.set_status(grpc::health::v1::HealthCheckResponse::SERVING); - auto data = Grpc::Common::serializeBody(response); + auto data = Grpc::Common::serializeToGrpcFrame(response); const char* raw_data = static_cast(data->linearize(data->length())); const uint64_t chunk_size = data->length() / 5; diff --git a/test/extensions/filters/http/grpc_json_transcoder/grpc_json_transcoder_integration_test.cc b/test/extensions/filters/http/grpc_json_transcoder/grpc_json_transcoder_integration_test.cc index 6f8450b441d49..eb94cdd34dc00 100644 --- a/test/extensions/filters/http/grpc_json_transcoder/grpc_json_transcoder_integration_test.cc +++ b/test/extensions/filters/http/grpc_json_transcoder/grpc_json_transcoder_integration_test.cc @@ -109,7 +109,7 @@ class GrpcJsonTranscoderIntegrationTest for (const auto& response_message_str : grpc_response_messages) { ResponseType response_message; EXPECT_TRUE(TextFormat::ParseFromString(response_message_str, &response_message)); - auto buffer = Grpc::Common::serializeBody(response_message); + auto buffer = Grpc::Common::serializeToGrpcFrame(response_message); upstream_request_->encodeData(*buffer, false); } Http::TestHeaderMapImpl response_trailers; diff --git a/test/extensions/filters/http/grpc_json_transcoder/json_transcoder_filter_test.cc b/test/extensions/filters/http/grpc_json_transcoder/json_transcoder_filter_test.cc index b6af82749c8db..9d1000158d24c 100644 --- a/test/extensions/filters/http/grpc_json_transcoder/json_transcoder_filter_test.cc +++ b/test/extensions/filters/http/grpc_json_transcoder/json_transcoder_filter_test.cc @@ -390,7 +390,7 @@ TEST_F(GrpcJsonTranscoderFilterTest, TranscodingUnaryPost) { response.set_id(20); response.set_theme("Children"); - auto response_data = Grpc::Common::serializeBody(response); + auto response_data = Grpc::Common::serializeToGrpcFrame(response); EXPECT_EQ(Http::FilterDataStatus::StopIterationAndBuffer, filter_.encodeData(*response_data, false)); @@ -454,7 +454,7 @@ TEST_F(GrpcJsonTranscoderFilterTest, TranscodingUnaryPostWithPackageServiceMetho response.set_id(20); response.set_theme("Children"); - auto response_data = Grpc::Common::serializeBody(response); + auto response_data = Grpc::Common::serializeToGrpcFrame(response); EXPECT_EQ(Http::FilterDataStatus::StopIterationAndBuffer, filter_.encodeData(*response_data, false)); @@ -482,7 +482,7 @@ TEST_F(GrpcJsonTranscoderFilterTest, ForwardUnaryPostGrpc) { bookstore::CreateShelfRequest request; request.mutable_shelf()->set_theme("Children"); - Buffer::InstancePtr request_data = Grpc::Common::serializeBody(request); + Buffer::InstancePtr request_data = Grpc::Common::serializeToGrpcFrame(request); EXPECT_EQ(Http::FilterDataStatus::Continue, filter_.decodeData(*request_data, true)); Grpc::Decoder decoder; @@ -517,7 +517,7 @@ TEST_F(GrpcJsonTranscoderFilterTest, ForwardUnaryPostGrpc) { response.set_id(20); response.set_theme("Children"); - Buffer::InstancePtr response_data = Grpc::Common::serializeBody(response); + Buffer::InstancePtr response_data = Grpc::Common::serializeToGrpcFrame(response); EXPECT_EQ(Http::FilterDataStatus::Continue, filter_.encodeData(*response_data, true)); frames.clear(); @@ -640,7 +640,7 @@ TEST_F(GrpcJsonTranscoderFilterTest, TranscodingUnaryWithHttpBodyAsOutput) { response.set_content_type("text/html"); response.set_data("

Hello, world!

"); - auto response_data = Grpc::Common::serializeBody(response); + auto response_data = Grpc::Common::serializeToGrpcFrame(response); EXPECT_EQ(Http::FilterDataStatus::StopIterationAndBuffer, filter_.encodeData(*response_data, false)); @@ -674,7 +674,7 @@ TEST_F(GrpcJsonTranscoderFilterTest, TranscodingUnaryWithHttpBodyAsOutputAndSpli response.set_content_type("text/html"); response.set_data("

Hello, world!

"); - auto response_data = Grpc::Common::serializeBody(response); + auto response_data = Grpc::Common::serializeToGrpcFrame(response); // Firstly, the response data buffer is split into two parts. Buffer::OwnedImpl response_data_first_part; @@ -737,7 +737,7 @@ TEST_P(GrpcJsonTranscoderFilterPrintTest, PrintOptions) { author.set_gender(bookstore::Author_Gender_MALE); author.set_last_name("Shakespeare"); - const auto response_data = Grpc::Common::serializeBody(author); + const auto response_data = Grpc::Common::serializeToGrpcFrame(author); EXPECT_EQ(Http::FilterDataStatus::StopIterationAndBuffer, filter_->encodeData(*response_data, false)); diff --git a/test/extensions/tracers/lightstep/lightstep_tracer_impl_test.cc b/test/extensions/tracers/lightstep/lightstep_tracer_impl_test.cc index 75a34c1acf9a8..3e3deb675c361 100644 --- a/test/extensions/tracers/lightstep/lightstep_tracer_impl_test.cc +++ b/test/extensions/tracers/lightstep/lightstep_tracer_impl_test.cc @@ -202,7 +202,7 @@ TEST_F(LightStepDriverTest, FlushSeveralSpans) { std::unique_ptr collector_response = lightstep::Transporter::MakeCollectorResponse(); EXPECT_NE(collector_response, nullptr); - msg->body() = Grpc::Common::serializeBody(*collector_response); + msg->body() = Grpc::Common::serializeToGrpcFrame(*collector_response); callback->onSuccess(std::move(msg)); diff --git a/test/integration/fake_upstream.h b/test/integration/fake_upstream.h index 4ec4e181a3af9..dc979dc4f568e 100644 --- a/test/integration/fake_upstream.h +++ b/test/integration/fake_upstream.h @@ -90,7 +90,7 @@ class FakeStream : public Http::StreamDecoder, void startGrpcStream(); void finishGrpcStream(Grpc::Status::GrpcStatus status); template void sendGrpcMessage(const T& message) { - auto serialized_response = Grpc::Common::serializeBody(message); + auto serialized_response = Grpc::Common::serializeToGrpcFrame(message); encodeData(*serialized_response, false); ENVOY_LOG(debug, "Sent gRPC message: {}", message.DebugString()); } From 6014cd48caeec983385b832371a669218d0f4fcf Mon Sep 17 00:00:00 2001 From: John Plevyak Date: Mon, 29 Apr 2019 09:01:52 -0700 Subject: [PATCH 02/17] Fix CI test: use-after-move. Signed-off-by: John Plevyak --- source/common/grpc/common.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/common/grpc/common.cc b/source/common/grpc/common.cc index 0065e988592e8..a76add6ca8584 100644 --- a/source/common/grpc/common.cc +++ b/source/common/grpc/common.cc @@ -318,7 +318,7 @@ grpc::ByteBuffer Common::makeByteBuffer(Buffer::InstancePtr bufferInstance) { return {&oneSlice, 1}; } STACK_ARRAY(manyRawSlices, Buffer::RawSlice, nSlices); - bufferInstance->getRawSlices(manyRawSlices.begin(), nSlices); + container->buffer_->getRawSlices(manyRawSlices.begin(), nSlices); std::vector slices; slices.reserve(nSlices); for (int i = 0; i < nSlices; i++) { From 88ef0fcb8f3ebfb1adccb972746920465260b17b Mon Sep 17 00:00:00 2001 From: John Plevyak Date: Mon, 29 Apr 2019 09:21:59 -0700 Subject: [PATCH 03/17] Fix typo. Signed-off-by: John Plevyak --- source/common/grpc/common.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/common/grpc/common.h b/source/common/grpc/common.h index b617792fecba9..06cb45984793f 100644 --- a/source/common/grpc/common.h +++ b/source/common/grpc/common.h @@ -155,7 +155,7 @@ class Common { static std::string typeUrl(const std::string& qualified_name); /** - * BUild grpc::ByteBuffer which aliases the data in a Buffer::InstancePtr. + * Build grpc::ByteBuffer which aliases the data in a Buffer::InstancePtr. * @param bufferInstance source data container. * @return byteBuffer target container aliased to the data in Buffer::Instance and owning the * Buffer::Instance. From 253b584311fb22568d517afc16ca2e2bc6b91cab Mon Sep 17 00:00:00 2001 From: John Plevyak Date: Tue, 30 Apr 2019 08:41:32 -0700 Subject: [PATCH 04/17] Address comments. Signed-off-by: John Plevyak --- source/common/grpc/common.cc | 40 ++++++++++++++++----------------- source/common/grpc/common.h | 2 +- test/common/grpc/common_test.cc | 2 +- test/test_common/environment.cc | 3 +++ 4 files changed, 25 insertions(+), 22 deletions(-) diff --git a/source/common/grpc/common.cc b/source/common/grpc/common.cc index a76add6ca8584..971382faacfc7 100644 --- a/source/common/grpc/common.cc +++ b/source/common/grpc/common.cc @@ -301,27 +301,27 @@ static void derefBufferInstanceContainer(void* container_ptr) { } } -grpc::ByteBuffer Common::makeByteBuffer(Buffer::InstancePtr bufferInstance) { - if (!bufferInstance) { +grpc::ByteBuffer Common::makeByteBuffer(Buffer::InstancePtr&& buffer_instance) { + if (!buffer_instance) { return {}; } - Buffer::RawSlice oneRawSlice; + Buffer::RawSlice on_raw_slice; // NB: we need to pass in >= 1 in order to get the real "n" (see Buffer::Instance for details). - int nSlices = bufferInstance->getRawSlices(&oneRawSlice, 1); - if (nSlices <= 0) { + int n_slices = buffer_instance->getRawSlices(&on_raw_slice, 1); + if (n_slices <= 0) { return {}; } - auto container = new BufferInstanceContainer{nSlices, std::move(bufferInstance)}; - if (nSlices == 1) { - grpc::Slice oneSlice(oneRawSlice.mem_, oneRawSlice.len_, &derefBufferInstanceContainer, + auto container = new BufferInstanceContainer{n_slices, std::move(buffer_instance)}; + if (n_slices == 1) { + grpc::Slice oneSlice(on_raw_slice.mem_, on_raw_slice.len_, &derefBufferInstanceContainer, container); return {&oneSlice, 1}; } - STACK_ARRAY(manyRawSlices, Buffer::RawSlice, nSlices); - container->buffer_->getRawSlices(manyRawSlices.begin(), nSlices); + STACK_ARRAY(manyRawSlices, Buffer::RawSlice, n_slices); + container->buffer_->getRawSlices(manyRawSlices.begin(), n_slices); std::vector slices; - slices.reserve(nSlices); - for (int i = 0; i < nSlices; i++) { + slices.reserve(n_slices); + for (int i = 0; i < n_slices; i++) { slices.emplace_back(manyRawSlices[i].mem_, manyRawSlices[i].len_, &derefBufferInstanceContainer, container); } @@ -330,21 +330,21 @@ grpc::ByteBuffer Common::makeByteBuffer(Buffer::InstancePtr bufferInstance) { struct ByteBufferContainer { ByteBufferContainer(int ref_count) : ref_count_(ref_count) {} - ~ByteBufferContainer() { ::free(fragments); } + ~ByteBufferContainer() { ::free(fragments_); } std::atomic ref_count_; - Buffer::BufferFragmentImpl* fragments = nullptr; + Buffer::BufferFragmentImpl* fragments_ = nullptr; std::vector slices_; }; -Buffer::InstancePtr Common::makeBufferInstance(const grpc::ByteBuffer& byteBuffer) { +Buffer::InstancePtr Common::makeBufferInstance(const grpc::ByteBuffer& byte_buffer) { auto buffer = std::make_unique(); - if (byteBuffer.Length() == 0) { + if (byte_buffer.Length() == 0) { return buffer; } // NB: ByteBuffer::Dump moves the data out of the ByteBuffer so we need to ensure that the // lifetime of the Slice(s) exceeds our Buffer::Instance. std::vector slices; - byteBuffer.Dump(&slices); + byte_buffer.Dump(&slices); if (slices.size() == 0) { return buffer; } @@ -358,14 +358,14 @@ Buffer::InstancePtr Common::makeBufferInstance(const grpc::ByteBuffer& byteBuffe }; // NB: addBufferFragment takes a pointer alias to the BufferFragmentImpl which is passed in so we // need to ensure that the lifetime of those objects exceeds that of the Buffer::Instance. - container->fragments = static_cast( + container->fragments_ = static_cast( ::malloc(sizeof(Buffer::BufferFragmentImpl) * slices.size())); for (size_t i = 0; i < slices.size(); i++) { - new (&container->fragments[i]) + new (&container->fragments_[i]) Buffer::BufferFragmentImpl(slices[i].begin(), slices[i].size(), releaser); } for (size_t i = 0; i < slices.size(); i++) { - buffer->addBufferFragment(container->fragments[i]); + buffer->addBufferFragment(container->fragments_[i]); } container->slices_ = std::move(slices); return buffer; diff --git a/source/common/grpc/common.h b/source/common/grpc/common.h index 06cb45984793f..aeb5c48fff48d 100644 --- a/source/common/grpc/common.h +++ b/source/common/grpc/common.h @@ -160,7 +160,7 @@ class Common { * @return byteBuffer target container aliased to the data in Buffer::Instance and owning the * Buffer::Instance. */ - static grpc::ByteBuffer makeByteBuffer(Buffer::InstancePtr bufferInstance); + static grpc::ByteBuffer makeByteBuffer(Buffer::InstancePtr&& bufferInstance); /** * Build Buffer::Instance which aliases the data in a grpc::ByteBuffer. diff --git a/test/common/grpc/common_test.cc b/test/common/grpc/common_test.cc index 744c24fb8aec8..73da83daca0b4 100644 --- a/test/common/grpc/common_test.cc +++ b/test/common/grpc/common_test.cc @@ -412,7 +412,7 @@ TEST(GrpcCommonTest, ByteBufferInstanceRoundTrip) { EXPECT_EQ(bufferInstance2->toString(), "test this"); } -TEST(GrpcCommonTest, PreependGrpcFrameHeader) { +TEST(GrpcCommonTest, PrependGrpcFrameHeader) { auto buffer = std::make_unique(); buffer->add("test", 4); char expected_header[5]; diff --git a/test/test_common/environment.cc b/test/test_common/environment.cc index d56bcc2a7ef66..30e4abc00f967 100644 --- a/test/test_common/environment.cc +++ b/test/test_common/environment.cc @@ -148,6 +148,8 @@ bool TestEnvironment::shouldRunTestForIpVersion(Network::Address::IpVersion type std::vector TestEnvironment::getIpVersionsForTest() { std::vector parameters; + parameters.push_back(Network::Address::IpVersion::v4); +#if 0 for (auto version : {Network::Address::IpVersion::v4, Network::Address::IpVersion::v6}) { if (TestEnvironment::shouldRunTestForIpVersion(version)) { parameters.push_back(version); @@ -159,6 +161,7 @@ std::vector TestEnvironment::getIpVersionsForTest() } } } +#endif return parameters; } From 5e0763a584d3106f55673027ce0a084bc9f9ba9d Mon Sep 17 00:00:00 2001 From: John Plevyak Date: Tue, 30 Apr 2019 09:41:25 -0700 Subject: [PATCH 05/17] Address comments. Signed-off-by: John Plevyak --- source/common/grpc/common.cc | 33 +++++++++++++++++---------------- test/common/grpc/common_test.cc | 4 ++++ 2 files changed, 21 insertions(+), 16 deletions(-) diff --git a/source/common/grpc/common.cc b/source/common/grpc/common.cc index 971382faacfc7..a193a2bf86ac3 100644 --- a/source/common/grpc/common.cc +++ b/source/common/grpc/common.cc @@ -136,7 +136,7 @@ Buffer::InstancePtr Common::serializeToGrpcFrame(const Protobuf::Message& messag } Buffer::InstancePtr Common::serializeMessage(const Protobuf::Message& message) { - Buffer::InstancePtr body(new Buffer::OwnedImpl()); + auto body = std::make_unique(); const uint32_t size = message.ByteSize(); Buffer::RawSlice iovec; body->reserve(size, &iovec, 1); @@ -289,17 +289,18 @@ std::string Common::typeUrl(const std::string& qualified_name) { struct BufferInstanceContainer { BufferInstanceContainer(int ref_count, Buffer::InstancePtr buffer) : ref_count_(ref_count), buffer_(std::move(buffer)) {} - std::atomic ref_count_; + std::atomic ref_count_; // In case gPRC dereferences in a different threads. Buffer::InstancePtr buffer_; -}; -static void derefBufferInstanceContainer(void* container_ptr) { - auto container = reinterpret_cast(container_ptr); - container->ref_count_--; - if (container->ref_count_ <= 0) { - delete container; + static void derefBufferInstanceContainer(void* container_ptr) { + auto container = reinterpret_cast(container_ptr); + container->ref_count_--; + // This is safe because the ref_count_ is never incremented. + if (container->ref_count_ <= 0) { + delete container; + } } -} +}; grpc::ByteBuffer Common::makeByteBuffer(Buffer::InstancePtr&& buffer_instance) { if (!buffer_instance) { @@ -313,8 +314,8 @@ grpc::ByteBuffer Common::makeByteBuffer(Buffer::InstancePtr&& buffer_instance) { } auto container = new BufferInstanceContainer{n_slices, std::move(buffer_instance)}; if (n_slices == 1) { - grpc::Slice oneSlice(on_raw_slice.mem_, on_raw_slice.len_, &derefBufferInstanceContainer, - container); + grpc::Slice oneSlice(on_raw_slice.mem_, on_raw_slice.len_, + &BufferInstanceContainer::derefBufferInstanceContainer, container); return {&oneSlice, 1}; } STACK_ARRAY(manyRawSlices, Buffer::RawSlice, n_slices); @@ -322,8 +323,8 @@ grpc::ByteBuffer Common::makeByteBuffer(Buffer::InstancePtr&& buffer_instance) { std::vector slices; slices.reserve(n_slices); for (int i = 0; i < n_slices; i++) { - slices.emplace_back(manyRawSlices[i].mem_, manyRawSlices[i].len_, &derefBufferInstanceContainer, - container); + slices.emplace_back(manyRawSlices[i].mem_, manyRawSlices[i].len_, + &BufferInstanceContainer::derefBufferInstanceContainer, container); } return {&slices[0], slices.size()}; } @@ -331,8 +332,8 @@ grpc::ByteBuffer Common::makeByteBuffer(Buffer::InstancePtr&& buffer_instance) { struct ByteBufferContainer { ByteBufferContainer(int ref_count) : ref_count_(ref_count) {} ~ByteBufferContainer() { ::free(fragments_); } - std::atomic ref_count_; - Buffer::BufferFragmentImpl* fragments_ = nullptr; + std::atomic ref_count_; // In case gPRC dereferences in a different threads. + Buffer::BufferFragmentImpl* fragments_ = 0; std::vector slices_; }; @@ -348,7 +349,7 @@ Buffer::InstancePtr Common::makeBufferInstance(const grpc::ByteBuffer& byte_buff if (slices.size() == 0) { return buffer; } - auto container = new ByteBufferContainer(static_cast(slices.size())); + auto* container = new ByteBufferContainer(static_cast(slices.size())); std::function releaser = [container](const void*, size_t, const Buffer::BufferFragmentImpl*) { container->ref_count_--; diff --git a/test/common/grpc/common_test.cc b/test/common/grpc/common_test.cc index 73da83daca0b4..8aaf3e800f908 100644 --- a/test/common/grpc/common_test.cc +++ b/test/common/grpc/common_test.cc @@ -368,6 +368,7 @@ TEST(GrpcCommonTest, MakeBufferInstance1) { EXPECT_EQ(bufferInstance->toString(), "test"); } +// Test building a Buffer::Instance from 3 grpc::Slice(s). TEST(GrpcCommonTest, MakeBufferInstance3) { grpc::Slice slices[3] = {{"test"}, {" "}, {"this"}}; grpc::ByteBuffer byteBuffer(slices, 3); @@ -388,6 +389,7 @@ TEST(GrpcCommonTest, MakeByteBuffer1) { EXPECT_EQ(str, "test"); } +// Test building a grpc::ByteBuffer from a Bufffer::Instance with 3 slices. TEST(GrpcCommonTest, MakeByteBuffer3) { auto buffer = std::make_unique(); buffer->add("test", 4); @@ -403,6 +405,7 @@ TEST(GrpcCommonTest, MakeByteBuffer3) { EXPECT_EQ(str, "test this"); } +// Test building a Buffer::Instance from a grpc::ByteBuffer from a Bufffer::Instance with 3 slices. TEST(GrpcCommonTest, ByteBufferInstanceRoundTrip) { grpc::Slice slices[3] = {{"test"}, {" "}, {"this"}}; grpc::ByteBuffer byteBuffer1(slices, 3); @@ -412,6 +415,7 @@ TEST(GrpcCommonTest, ByteBufferInstanceRoundTrip) { EXPECT_EQ(bufferInstance2->toString(), "test this"); } +// Ensure that the correct gPRC header is constructed for a Buffer::Instance. TEST(GrpcCommonTest, PrependGrpcFrameHeader) { auto buffer = std::make_unique(); buffer->add("test", 4); From c54b9da1159aa0fe453cf9460bbb6ad2df8dafff Mon Sep 17 00:00:00 2001 From: John Plevyak Date: Mon, 6 May 2019 11:39:08 -0700 Subject: [PATCH 06/17] Address comments. Signed-off-by: John Plevyak --- source/common/grpc/common.cc | 6 +++--- source/common/grpc/common.h | 2 +- test/common/grpc/common_test.cc | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/source/common/grpc/common.cc b/source/common/grpc/common.cc index a193a2bf86ac3..061b56650b8e1 100644 --- a/source/common/grpc/common.cc +++ b/source/common/grpc/common.cc @@ -372,12 +372,12 @@ Buffer::InstancePtr Common::makeBufferInstance(const grpc::ByteBuffer& byte_buff return buffer; } -void Common::PrependGrpcFrameHeader(Buffer::Instance* buffer) { +void Common::PrependGrpcFrameHeader(Buffer::Instance& buffer) { char header[5]; header[0] = 0; // flags - const uint32_t nsize = htonl(buffer->length()); + const uint32_t nsize = htonl(buffer.length()); std::memcpy(&header[1], reinterpret_cast(&nsize), sizeof(uint32_t)); - buffer->prepend(absl::string_view(header, 5)); + buffer.prepend(absl::string_view(header, 5)); } } // namespace Grpc diff --git a/source/common/grpc/common.h b/source/common/grpc/common.h index aeb5c48fff48d..53b4b8d36bf02 100644 --- a/source/common/grpc/common.h +++ b/source/common/grpc/common.h @@ -173,7 +173,7 @@ class Common { * Prepend a gRPC frame header to a Buffer::Instance containing a single gRPC frame. * @param bufferInstance containing the frame data which will be modified. */ - static void PrependGrpcFrameHeader(Buffer::Instance* buffer); + static void PrependGrpcFrameHeader(Buffer::Instance& buffer); private: static void checkForHeaderOnlyError(Http::Message& http_response); diff --git a/test/common/grpc/common_test.cc b/test/common/grpc/common_test.cc index 8aaf3e800f908..ad999575e25a8 100644 --- a/test/common/grpc/common_test.cc +++ b/test/common/grpc/common_test.cc @@ -424,7 +424,7 @@ TEST(GrpcCommonTest, PrependGrpcFrameHeader) { const uint32_t nsize = htonl(4); std::memcpy(&expected_header[1], reinterpret_cast(&nsize), sizeof(uint32_t)); std::string header_string(expected_header, 5); - Common::PrependGrpcFrameHeader(buffer.get()); + Common::PrependGrpcFrameHeader(*buffer); EXPECT_EQ(buffer->toString(), header_string + "test"); } From 28602810cf502125c90a92b04a14e684e8a8c682 Mon Sep 17 00:00:00 2001 From: John Plevyak Date: Mon, 6 May 2019 12:08:07 -0700 Subject: [PATCH 07/17] Address alignment comment. Signed-off-by: John Plevyak --- source/common/grpc/common.cc | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/source/common/grpc/common.cc b/source/common/grpc/common.cc index 061b56650b8e1..e148f6e8e03cf 100644 --- a/source/common/grpc/common.cc +++ b/source/common/grpc/common.cc @@ -359,8 +359,9 @@ Buffer::InstancePtr Common::makeBufferInstance(const grpc::ByteBuffer& byte_buff }; // NB: addBufferFragment takes a pointer alias to the BufferFragmentImpl which is passed in so we // need to ensure that the lifetime of those objects exceeds that of the Buffer::Instance. - container->fragments_ = static_cast( - ::malloc(sizeof(Buffer::BufferFragmentImpl) * slices.size())); + ASSERT(!::posix_memalign(reinterpret_cast(&container->fragments_), + alignof(Buffer::BufferFragmentImpl), + sizeof(Buffer::BufferFragmentImpl) * slices.size())); for (size_t i = 0; i < slices.size(); i++) { new (&container->fragments_[i]) Buffer::BufferFragmentImpl(slices[i].begin(), slices[i].size(), releaser); From 542ea547809dc515a1037727f884c860c48840f1 Mon Sep 17 00:00:00 2001 From: John Plevyak Date: Mon, 6 May 2019 12:20:34 -0700 Subject: [PATCH 08/17] Address additional comments. Signed-off-by: John Plevyak --- source/common/grpc/common.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/common/grpc/common.cc b/source/common/grpc/common.cc index e148f6e8e03cf..c393c3fae6651 100644 --- a/source/common/grpc/common.cc +++ b/source/common/grpc/common.cc @@ -289,7 +289,7 @@ std::string Common::typeUrl(const std::string& qualified_name) { struct BufferInstanceContainer { BufferInstanceContainer(int ref_count, Buffer::InstancePtr buffer) : ref_count_(ref_count), buffer_(std::move(buffer)) {} - std::atomic ref_count_; // In case gPRC dereferences in a different threads. + std::atomic ref_count_; // In case gPRC dereferences in a different threads. Buffer::InstancePtr buffer_; static void derefBufferInstanceContainer(void* container_ptr) { @@ -332,7 +332,7 @@ grpc::ByteBuffer Common::makeByteBuffer(Buffer::InstancePtr&& buffer_instance) { struct ByteBufferContainer { ByteBufferContainer(int ref_count) : ref_count_(ref_count) {} ~ByteBufferContainer() { ::free(fragments_); } - std::atomic ref_count_; // In case gPRC dereferences in a different threads. + uint32_t ref_count_; Buffer::BufferFragmentImpl* fragments_ = 0; std::vector slices_; }; From 887f759e71eeaec18930161ae93bea318033ab58 Mon Sep 17 00:00:00 2001 From: John Plevyak Date: Mon, 6 May 2019 14:09:36 -0700 Subject: [PATCH 09/17] Move google grpc specific code under compile options. Signed-off-by: John Plevyak --- source/common/grpc/BUILD | 19 ++++ source/common/grpc/common.cc | 87 ---------------- source/common/grpc/common.h | 15 --- source/common/grpc/google_grpc_utils.cc | 110 +++++++++++++++++++++ source/common/grpc/google_grpc_utils.h | 32 ++++++ test/common/grpc/BUILD | 12 +++ test/common/grpc/common_test.cc | 64 ------------ test/common/grpc/google_grpc_utils_test.cc | 79 +++++++++++++++ 8 files changed, 252 insertions(+), 166 deletions(-) create mode 100644 source/common/grpc/google_grpc_utils.cc create mode 100644 source/common/grpc/google_grpc_utils.h create mode 100644 test/common/grpc/google_grpc_utils_test.cc diff --git a/source/common/grpc/BUILD b/source/common/grpc/BUILD index 1acb3d414fd40..08f8747ab7228 100644 --- a/source/common/grpc/BUILD +++ b/source/common/grpc/BUILD @@ -86,6 +86,25 @@ envoy_cc_library( ], ) +envoy_cc_library( + name = "google_grpc_utils_lib", + srcs = ["google_grpc_utils.cc"], + hdrs = ["google_grpc_utils.h"], + external_deps = [ + "abseil_optional", + "grpc", + ], + deps = [ + "//source/common/buffer:buffer_lib", + "//source/common/common:assert_lib", + "//source/common/common:empty_string", + "//source/common/common:enum_to_int", + "//source/common/common:macros", + "//source/common/common:utility_lib", + "//source/common/grpc:status_lib", + ], +) + envoy_cc_library( name = "google_async_client_lib", srcs = ["google_async_client_impl.cc"], diff --git a/source/common/grpc/common.cc b/source/common/grpc/common.cc index c393c3fae6651..35e871510fbe7 100644 --- a/source/common/grpc/common.cc +++ b/source/common/grpc/common.cc @@ -286,93 +286,6 @@ std::string Common::typeUrl(const std::string& qualified_name) { return typeUrlPrefix() + "/" + qualified_name; } -struct BufferInstanceContainer { - BufferInstanceContainer(int ref_count, Buffer::InstancePtr buffer) - : ref_count_(ref_count), buffer_(std::move(buffer)) {} - std::atomic ref_count_; // In case gPRC dereferences in a different threads. - Buffer::InstancePtr buffer_; - - static void derefBufferInstanceContainer(void* container_ptr) { - auto container = reinterpret_cast(container_ptr); - container->ref_count_--; - // This is safe because the ref_count_ is never incremented. - if (container->ref_count_ <= 0) { - delete container; - } - } -}; - -grpc::ByteBuffer Common::makeByteBuffer(Buffer::InstancePtr&& buffer_instance) { - if (!buffer_instance) { - return {}; - } - Buffer::RawSlice on_raw_slice; - // NB: we need to pass in >= 1 in order to get the real "n" (see Buffer::Instance for details). - int n_slices = buffer_instance->getRawSlices(&on_raw_slice, 1); - if (n_slices <= 0) { - return {}; - } - auto container = new BufferInstanceContainer{n_slices, std::move(buffer_instance)}; - if (n_slices == 1) { - grpc::Slice oneSlice(on_raw_slice.mem_, on_raw_slice.len_, - &BufferInstanceContainer::derefBufferInstanceContainer, container); - return {&oneSlice, 1}; - } - STACK_ARRAY(manyRawSlices, Buffer::RawSlice, n_slices); - container->buffer_->getRawSlices(manyRawSlices.begin(), n_slices); - std::vector slices; - slices.reserve(n_slices); - for (int i = 0; i < n_slices; i++) { - slices.emplace_back(manyRawSlices[i].mem_, manyRawSlices[i].len_, - &BufferInstanceContainer::derefBufferInstanceContainer, container); - } - return {&slices[0], slices.size()}; -} - -struct ByteBufferContainer { - ByteBufferContainer(int ref_count) : ref_count_(ref_count) {} - ~ByteBufferContainer() { ::free(fragments_); } - uint32_t ref_count_; - Buffer::BufferFragmentImpl* fragments_ = 0; - std::vector slices_; -}; - -Buffer::InstancePtr Common::makeBufferInstance(const grpc::ByteBuffer& byte_buffer) { - auto buffer = std::make_unique(); - if (byte_buffer.Length() == 0) { - return buffer; - } - // NB: ByteBuffer::Dump moves the data out of the ByteBuffer so we need to ensure that the - // lifetime of the Slice(s) exceeds our Buffer::Instance. - std::vector slices; - byte_buffer.Dump(&slices); - if (slices.size() == 0) { - return buffer; - } - auto* container = new ByteBufferContainer(static_cast(slices.size())); - std::function releaser = - [container](const void*, size_t, const Buffer::BufferFragmentImpl*) { - container->ref_count_--; - if (container->ref_count_ <= 0) { - delete container; - } - }; - // NB: addBufferFragment takes a pointer alias to the BufferFragmentImpl which is passed in so we - // need to ensure that the lifetime of those objects exceeds that of the Buffer::Instance. - ASSERT(!::posix_memalign(reinterpret_cast(&container->fragments_), - alignof(Buffer::BufferFragmentImpl), - sizeof(Buffer::BufferFragmentImpl) * slices.size())); - for (size_t i = 0; i < slices.size(); i++) { - new (&container->fragments_[i]) - Buffer::BufferFragmentImpl(slices[i].begin(), slices[i].size(), releaser); - } - for (size_t i = 0; i < slices.size(); i++) { - buffer->addBufferFragment(container->fragments_[i]); - } - container->slices_ = std::move(slices); - return buffer; -} - void Common::PrependGrpcFrameHeader(Buffer::Instance& buffer) { char header[5]; header[0] = 0; // flags diff --git a/source/common/grpc/common.h b/source/common/grpc/common.h index 53b4b8d36bf02..e00173bb42d9d 100644 --- a/source/common/grpc/common.h +++ b/source/common/grpc/common.h @@ -154,21 +154,6 @@ class Common { */ static std::string typeUrl(const std::string& qualified_name); - /** - * Build grpc::ByteBuffer which aliases the data in a Buffer::InstancePtr. - * @param bufferInstance source data container. - * @return byteBuffer target container aliased to the data in Buffer::Instance and owning the - * Buffer::Instance. - */ - static grpc::ByteBuffer makeByteBuffer(Buffer::InstancePtr&& bufferInstance); - - /** - * Build Buffer::Instance which aliases the data in a grpc::ByteBuffer. - * @param byteBuffer source data container. - * @param Buffer::InstancePtr target container aliased to the data in grpc::ByteBuffer. - */ - static Buffer::InstancePtr makeBufferInstance(const grpc::ByteBuffer& byteBuffer); - /** * Prepend a gRPC frame header to a Buffer::Instance containing a single gRPC frame. * @param bufferInstance containing the frame data which will be modified. diff --git a/source/common/grpc/google_grpc_utils.cc b/source/common/grpc/google_grpc_utils.cc new file mode 100644 index 0000000000000..e29b7595c0419 --- /dev/null +++ b/source/common/grpc/google_grpc_utils.cc @@ -0,0 +1,110 @@ +#include "common/grpc/google_grpc_utils.h" + +#include +#include +#include +#include + +#include "common/buffer/buffer_impl.h" +#include "common/common/assert.h" +#include "common/common/empty_string.h" +#include "common/common/enum_to_int.h" +#include "common/common/fmt.h" +#include "common/common/macros.h" +#include "common/common/stack_array.h" +#include "common/common/utility.h" + +#include "absl/strings/match.h" + +namespace Envoy { +namespace Grpc { + +struct BufferInstanceContainer { + BufferInstanceContainer(int ref_count, Buffer::InstancePtr buffer) + : ref_count_(ref_count), buffer_(std::move(buffer)) {} + std::atomic ref_count_; // In case gPRC dereferences in a different threads. + Buffer::InstancePtr buffer_; + + static void derefBufferInstanceContainer(void* container_ptr) { + auto container = reinterpret_cast(container_ptr); + container->ref_count_--; + // This is safe because the ref_count_ is never incremented. + if (container->ref_count_ <= 0) { + delete container; + } + } +}; + +grpc::ByteBuffer GoogleGrpcUtils::makeByteBuffer(Buffer::InstancePtr&& buffer_instance) { + if (!buffer_instance) { + return {}; + } + Buffer::RawSlice on_raw_slice; + // NB: we need to pass in >= 1 in order to get the real "n" (see Buffer::Instance for details). + int n_slices = buffer_instance->getRawSlices(&on_raw_slice, 1); + if (n_slices <= 0) { + return {}; + } + auto container = new BufferInstanceContainer{n_slices, std::move(buffer_instance)}; + if (n_slices == 1) { + grpc::Slice oneSlice(on_raw_slice.mem_, on_raw_slice.len_, + &BufferInstanceContainer::derefBufferInstanceContainer, container); + return {&oneSlice, 1}; + } + STACK_ARRAY(manyRawSlices, Buffer::RawSlice, n_slices); + container->buffer_->getRawSlices(manyRawSlices.begin(), n_slices); + std::vector slices; + slices.reserve(n_slices); + for (int i = 0; i < n_slices; i++) { + slices.emplace_back(manyRawSlices[i].mem_, manyRawSlices[i].len_, + &BufferInstanceContainer::derefBufferInstanceContainer, container); + } + return {&slices[0], slices.size()}; +} + +struct ByteBufferContainer { + ByteBufferContainer(int ref_count) : ref_count_(ref_count) {} + ~ByteBufferContainer() { ::free(fragments_); } + uint32_t ref_count_; + Buffer::BufferFragmentImpl* fragments_ = 0; + std::vector slices_; +}; + +Buffer::InstancePtr GoogleGrpcUtils::makeBufferInstance(const grpc::ByteBuffer& byte_buffer) { + auto buffer = std::make_unique(); + if (byte_buffer.Length() == 0) { + return buffer; + } + // NB: ByteBuffer::Dump moves the data out of the ByteBuffer so we need to ensure that the + // lifetime of the Slice(s) exceeds our Buffer::Instance. + std::vector slices; + byte_buffer.Dump(&slices); + if (slices.size() == 0) { + return buffer; + } + auto* container = new ByteBufferContainer(static_cast(slices.size())); + std::function releaser = + [container](const void*, size_t, const Buffer::BufferFragmentImpl*) { + container->ref_count_--; + if (container->ref_count_ <= 0) { + delete container; + } + }; + // NB: addBufferFragment takes a pointer alias to the BufferFragmentImpl which is passed in so we + // need to ensure that the lifetime of those objects exceeds that of the Buffer::Instance. + ASSERT(!::posix_memalign(reinterpret_cast(&container->fragments_), + alignof(Buffer::BufferFragmentImpl), + sizeof(Buffer::BufferFragmentImpl) * slices.size())); + for (size_t i = 0; i < slices.size(); i++) { + new (&container->fragments_[i]) + Buffer::BufferFragmentImpl(slices[i].begin(), slices[i].size(), releaser); + } + for (size_t i = 0; i < slices.size(); i++) { + buffer->addBufferFragment(container->fragments_[i]); + } + container->slices_ = std::move(slices); + return buffer; +} + +} // namespace Grpc +} // namespace Envoy diff --git a/source/common/grpc/google_grpc_utils.h b/source/common/grpc/google_grpc_utils.h new file mode 100644 index 0000000000000..5d1f9d63d83ef --- /dev/null +++ b/source/common/grpc/google_grpc_utils.h @@ -0,0 +1,32 @@ +#pragma once + +#include +#include + +#include "envoy/buffer/buffer.h" + +#include "grpcpp/grpcpp.h" + +namespace Envoy { +namespace Grpc { + +class GoogleGrpcUtils { +public: + /** + * Build grpc::ByteBuffer which aliases the data in a Buffer::InstancePtr. + * @param bufferInstance source data container. + * @return byteBuffer target container aliased to the data in Buffer::Instance and owning the + * Buffer::Instance. + */ + static grpc::ByteBuffer makeByteBuffer(Buffer::InstancePtr&& bufferInstance); + + /** + * Build Buffer::Instance which aliases the data in a grpc::ByteBuffer. + * @param byteBuffer source data container. + * @param Buffer::InstancePtr target container aliased to the data in grpc::ByteBuffer. + */ + static Buffer::InstancePtr makeBufferInstance(const grpc::ByteBuffer& byteBuffer); +}; + +} // namespace Grpc +} // namespace Envoy diff --git a/test/common/grpc/BUILD b/test/common/grpc/BUILD index 549801ad5e7dd..0d880ea03ff56 100644 --- a/test/common/grpc/BUILD +++ b/test/common/grpc/BUILD @@ -58,6 +58,18 @@ envoy_cc_test( ], ) +envoy_cc_test( + name = "google_grpc_utils_test", + srcs = envoy_select_google_grpc(["google_grpc_utils_test.cc"]), + deps = [ + "//source/common/grpc:common_lib", + "//source/common/http:headers_lib", + "//test/mocks/upstream:upstream_mocks", + "//test/proto:helloworld_proto_cc", + "//test/test_common:utility_lib", + ] + envoy_select_google_grpc(["//source/common/grpc:google_grpc_utils_lib"]), +) + envoy_cc_test( name = "google_async_client_impl_test", srcs = envoy_select_google_grpc(["google_async_client_impl_test.cc"]), diff --git a/test/common/grpc/common_test.cc b/test/common/grpc/common_test.cc index ad999575e25a8..7c2e48958123d 100644 --- a/test/common/grpc/common_test.cc +++ b/test/common/grpc/common_test.cc @@ -351,70 +351,6 @@ TEST(GrpcCommonTest, ValidateResponse) { } } -TEST(GrpcCommonTest, MakeBufferInstanceEmpty) { - grpc::ByteBuffer byteBuffer; - Common::makeBufferInstance(byteBuffer); -} - -TEST(GrpcCommonTest, MakeByteBufferEmpty) { - auto buffer = std::make_unique(); - Common::makeByteBuffer(std::move(buffer)); -} - -TEST(GrpcCommonTest, MakeBufferInstance1) { - grpc::Slice slice("test"); - grpc::ByteBuffer byteBuffer(&slice, 1); - auto bufferInstance = Common::makeBufferInstance(byteBuffer); - EXPECT_EQ(bufferInstance->toString(), "test"); -} - -// Test building a Buffer::Instance from 3 grpc::Slice(s). -TEST(GrpcCommonTest, MakeBufferInstance3) { - grpc::Slice slices[3] = {{"test"}, {" "}, {"this"}}; - grpc::ByteBuffer byteBuffer(slices, 3); - auto bufferInstance = Common::makeBufferInstance(byteBuffer); - EXPECT_EQ(bufferInstance->toString(), "test this"); -} - -TEST(GrpcCommonTest, MakeByteBuffer1) { - auto buffer = std::make_unique(); - buffer->add("test", 4); - auto byteBuffer = Common::makeByteBuffer(std::move(buffer)); - std::vector slices; - byteBuffer.Dump(&slices); - std::string str; - for (auto& s : slices) { - str.append(std::string(reinterpret_cast(s.begin()), s.size())); - } - EXPECT_EQ(str, "test"); -} - -// Test building a grpc::ByteBuffer from a Bufffer::Instance with 3 slices. -TEST(GrpcCommonTest, MakeByteBuffer3) { - auto buffer = std::make_unique(); - buffer->add("test", 4); - buffer->add(" ", 1); - buffer->add("this", 4); - auto byteBuffer = Common::makeByteBuffer(std::move(buffer)); - std::vector slices; - byteBuffer.Dump(&slices); - std::string str; - for (auto& s : slices) { - str.append(std::string(reinterpret_cast(s.begin()), s.size())); - } - EXPECT_EQ(str, "test this"); -} - -// Test building a Buffer::Instance from a grpc::ByteBuffer from a Bufffer::Instance with 3 slices. -TEST(GrpcCommonTest, ByteBufferInstanceRoundTrip) { - grpc::Slice slices[3] = {{"test"}, {" "}, {"this"}}; - grpc::ByteBuffer byteBuffer1(slices, 3); - auto bufferInstance1 = Common::makeBufferInstance(byteBuffer1); - auto byteBuffer2 = Common::makeByteBuffer(std::move(bufferInstance1)); - auto bufferInstance2 = Common::makeBufferInstance(byteBuffer2); - EXPECT_EQ(bufferInstance2->toString(), "test this"); -} - // Ensure that the correct gPRC header is constructed for a Buffer::Instance. TEST(GrpcCommonTest, PrependGrpcFrameHeader) { auto buffer = std::make_unique(); diff --git a/test/common/grpc/google_grpc_utils_test.cc b/test/common/grpc/google_grpc_utils_test.cc new file mode 100644 index 0000000000000..e385b967bdd4d --- /dev/null +++ b/test/common/grpc/google_grpc_utils_test.cc @@ -0,0 +1,79 @@ +#include + +#include "common/grpc/google_grpc_utils.h" + +#include "test/mocks/upstream/mocks.h" +#include "test/proto/helloworld.pb.h" +#include "test/test_common/utility.h" + +#include "gtest/gtest.h" + +namespace Envoy { +namespace Grpc { + +TEST(GoogleGrpcUtilsTest, MakeBufferInstanceEmpty) { + grpc::ByteBuffer byteBuffer; + GoogleGrpcUtils::makeBufferInstance(byteBuffer); +} + +TEST(GoogleGrpcUtilsTest, MakeByteBufferEmpty) { + auto buffer = std::make_unique(); + GoogleGrpcUtils::makeByteBuffer(std::move(buffer)); +} + +TEST(GoogleGrpcUtilsTest, MakeBufferInstance1) { + grpc::Slice slice("test"); + grpc::ByteBuffer byteBuffer(&slice, 1); + auto bufferInstance = GoogleGrpcUtils::makeBufferInstance(byteBuffer); + EXPECT_EQ(bufferInstance->toString(), "test"); +} + +// Test building a Buffer::Instance from 3 grpc::Slice(s). +TEST(GoogleGrpcUtilsTest, MakeBufferInstance3) { + grpc::Slice slices[3] = {{"test"}, {" "}, {"this"}}; + grpc::ByteBuffer byteBuffer(slices, 3); + auto bufferInstance = GoogleGrpcUtils::makeBufferInstance(byteBuffer); + EXPECT_EQ(bufferInstance->toString(), "test this"); +} + +TEST(GoogleGrpcUtilsTest, MakeByteBuffer1) { + auto buffer = std::make_unique(); + buffer->add("test", 4); + auto byteBuffer = GoogleGrpcUtils::makeByteBuffer(std::move(buffer)); + std::vector slices; + byteBuffer.Dump(&slices); + std::string str; + for (auto& s : slices) { + str.append(std::string(reinterpret_cast(s.begin()), s.size())); + } + EXPECT_EQ(str, "test"); +} + +// Test building a grpc::ByteBuffer from a Bufffer::Instance with 3 slices. +TEST(GoogleGrpcUtilsTest, MakeByteBuffer3) { + auto buffer = std::make_unique(); + buffer->add("test", 4); + buffer->add(" ", 1); + buffer->add("this", 4); + auto byteBuffer = GoogleGrpcUtils::makeByteBuffer(std::move(buffer)); + std::vector slices; + byteBuffer.Dump(&slices); + std::string str; + for (auto& s : slices) { + str.append(std::string(reinterpret_cast(s.begin()), s.size())); + } + EXPECT_EQ(str, "test this"); +} + +// Test building a Buffer::Instance from a grpc::ByteBuffer from a Bufffer::Instance with 3 slices. +TEST(GoogleGrpcUtilsTest, ByteBufferInstanceRoundTrip) { + grpc::Slice slices[3] = {{"test"}, {" "}, {"this"}}; + grpc::ByteBuffer byteBuffer1(slices, 3); + auto bufferInstance1 = GoogleGrpcUtils::makeBufferInstance(byteBuffer1); + auto byteBuffer2 = GoogleGrpcUtils::makeByteBuffer(std::move(bufferInstance1)); + auto bufferInstance2 = GoogleGrpcUtils::makeBufferInstance(byteBuffer2); + EXPECT_EQ(bufferInstance2->toString(), "test this"); +} + +} // namespace Grpc +} // namespace Envoy From 84e970967894814d9e31b61c95a5dff8c9c14d59 Mon Sep 17 00:00:00 2001 From: John Plevyak Date: Mon, 6 May 2019 19:34:34 -0700 Subject: [PATCH 10/17] Address comments. Signed-off-by: John Plevyak --- source/common/grpc/common.cc | 6 ++-- source/common/grpc/common.h | 2 +- source/common/grpc/google_grpc_utils.cc | 18 +++++----- test/common/grpc/common_test.cc | 6 ++-- test/common/grpc/google_grpc_utils_test.cc | 40 ++++++++++++---------- test/test_common/environment.cc | 3 -- 6 files changed, 37 insertions(+), 38 deletions(-) diff --git a/source/common/grpc/common.cc b/source/common/grpc/common.cc index 44073d1aea658..fe94fe7a57edc 100644 --- a/source/common/grpc/common.cc +++ b/source/common/grpc/common.cc @@ -285,12 +285,12 @@ std::string Common::typeUrl(const std::string& qualified_name) { return typeUrlPrefix() + "/" + qualified_name; } -void Common::PrependGrpcFrameHeader(Buffer::Instance& buffer) { - char header[5]; +void Common::prependGrpcFrameHeader(Buffer::Instance& buffer) { + std::array header; header[0] = 0; // flags const uint32_t nsize = htonl(buffer.length()); std::memcpy(&header[1], reinterpret_cast(&nsize), sizeof(uint32_t)); - buffer.prepend(absl::string_view(header, 5)); + buffer.prepend(absl::string_view(&header[0], 5)); } } // namespace Grpc diff --git a/source/common/grpc/common.h b/source/common/grpc/common.h index e00173bb42d9d..1493a3a4241e7 100644 --- a/source/common/grpc/common.h +++ b/source/common/grpc/common.h @@ -158,7 +158,7 @@ class Common { * Prepend a gRPC frame header to a Buffer::Instance containing a single gRPC frame. * @param bufferInstance containing the frame data which will be modified. */ - static void PrependGrpcFrameHeader(Buffer::Instance& buffer); + static void prependGrpcFrameHeader(Buffer::Instance& buffer); private: static void checkForHeaderOnlyError(Http::Message& http_response); diff --git a/source/common/grpc/google_grpc_utils.cc b/source/common/grpc/google_grpc_utils.cc index e29b7595c0419..060ce024bd6f9 100644 --- a/source/common/grpc/google_grpc_utils.cc +++ b/source/common/grpc/google_grpc_utils.cc @@ -20,7 +20,7 @@ namespace Envoy { namespace Grpc { struct BufferInstanceContainer { - BufferInstanceContainer(int ref_count, Buffer::InstancePtr buffer) + BufferInstanceContainer(int ref_count, Buffer::InstancePtr&& buffer) : ref_count_(ref_count), buffer_(std::move(buffer)) {} std::atomic ref_count_; // In case gPRC dereferences in a different threads. Buffer::InstancePtr buffer_; @@ -47,16 +47,16 @@ grpc::ByteBuffer GoogleGrpcUtils::makeByteBuffer(Buffer::InstancePtr&& buffer_in } auto container = new BufferInstanceContainer{n_slices, std::move(buffer_instance)}; if (n_slices == 1) { - grpc::Slice oneSlice(on_raw_slice.mem_, on_raw_slice.len_, - &BufferInstanceContainer::derefBufferInstanceContainer, container); - return {&oneSlice, 1}; + grpc::Slice one_slice(on_raw_slice.mem_, on_raw_slice.len_, + &BufferInstanceContainer::derefBufferInstanceContainer, container); + return {&one_slice, 1}; } - STACK_ARRAY(manyRawSlices, Buffer::RawSlice, n_slices); - container->buffer_->getRawSlices(manyRawSlices.begin(), n_slices); + STACK_ARRAY(many_raw_slices, Buffer::RawSlice, n_slices); + container->buffer_->getRawSlices(many_raw_slices.begin(), n_slices); std::vector slices; slices.reserve(n_slices); for (int i = 0; i < n_slices; i++) { - slices.emplace_back(manyRawSlices[i].mem_, manyRawSlices[i].len_, + slices.emplace_back(many_raw_slices[i].mem_, many_raw_slices[i].len_, &BufferInstanceContainer::derefBufferInstanceContainer, container); } return {&slices[0], slices.size()}; @@ -66,7 +66,7 @@ struct ByteBufferContainer { ByteBufferContainer(int ref_count) : ref_count_(ref_count) {} ~ByteBufferContainer() { ::free(fragments_); } uint32_t ref_count_; - Buffer::BufferFragmentImpl* fragments_ = 0; + Buffer::BufferFragmentImpl* fragments_ = nullptr; std::vector slices_; }; @@ -79,7 +79,7 @@ Buffer::InstancePtr GoogleGrpcUtils::makeBufferInstance(const grpc::ByteBuffer& // lifetime of the Slice(s) exceeds our Buffer::Instance. std::vector slices; byte_buffer.Dump(&slices); - if (slices.size() == 0) { + if (slices.empty()) { return buffer; } auto* container = new ByteBufferContainer(static_cast(slices.size())); diff --git a/test/common/grpc/common_test.cc b/test/common/grpc/common_test.cc index 87cb674bafec2..9e1b280d9a2f6 100644 --- a/test/common/grpc/common_test.cc +++ b/test/common/grpc/common_test.cc @@ -355,12 +355,12 @@ TEST(GrpcCommonTest, ValidateResponse) { TEST(GrpcCommonTest, PrependGrpcFrameHeader) { auto buffer = std::make_unique(); buffer->add("test", 4); - char expected_header[5]; + std::array expected_header; expected_header[0] = 0; // flags const uint32_t nsize = htonl(4); std::memcpy(&expected_header[1], reinterpret_cast(&nsize), sizeof(uint32_t)); - std::string header_string(expected_header, 5); - Common::PrependGrpcFrameHeader(*buffer); + std::string header_string(&expected_header[0], 5); + Common::prependGrpcFrameHeader(*buffer); EXPECT_EQ(buffer->toString(), header_string + "test"); } diff --git a/test/common/grpc/google_grpc_utils_test.cc b/test/common/grpc/google_grpc_utils_test.cc index e385b967bdd4d..584e472c97ef8 100644 --- a/test/common/grpc/google_grpc_utils_test.cc +++ b/test/common/grpc/google_grpc_utils_test.cc @@ -12,8 +12,8 @@ namespace Envoy { namespace Grpc { TEST(GoogleGrpcUtilsTest, MakeBufferInstanceEmpty) { - grpc::ByteBuffer byteBuffer; - GoogleGrpcUtils::makeBufferInstance(byteBuffer); + grpc::ByteBuffer byte_buffer; + GoogleGrpcUtils::makeBufferInstance(byte_buffer); } TEST(GoogleGrpcUtilsTest, MakeByteBufferEmpty) { @@ -23,25 +23,26 @@ TEST(GoogleGrpcUtilsTest, MakeByteBufferEmpty) { TEST(GoogleGrpcUtilsTest, MakeBufferInstance1) { grpc::Slice slice("test"); - grpc::ByteBuffer byteBuffer(&slice, 1); - auto bufferInstance = GoogleGrpcUtils::makeBufferInstance(byteBuffer); - EXPECT_EQ(bufferInstance->toString(), "test"); + grpc::ByteBuffer byte_buffer(&slice, 1); + auto buffer_instance = GoogleGrpcUtils::makeBufferInstance(byte_buffer); + EXPECT_EQ(buffer_instance->toString(), "test"); } // Test building a Buffer::Instance from 3 grpc::Slice(s). TEST(GoogleGrpcUtilsTest, MakeBufferInstance3) { - grpc::Slice slices[3] = {{"test"}, {" "}, {"this"}}; - grpc::ByteBuffer byteBuffer(slices, 3); - auto bufferInstance = GoogleGrpcUtils::makeBufferInstance(byteBuffer); - EXPECT_EQ(bufferInstance->toString(), "test this"); + std::array slices = {grpc::string("test"), grpc::string(" "), + grpc::string("this")}; + grpc::ByteBuffer byte_buffer(&slices[0], 3); + auto buffer_instance = GoogleGrpcUtils::makeBufferInstance(byte_buffer); + EXPECT_EQ(buffer_instance->toString(), "test this"); } TEST(GoogleGrpcUtilsTest, MakeByteBuffer1) { auto buffer = std::make_unique(); buffer->add("test", 4); - auto byteBuffer = GoogleGrpcUtils::makeByteBuffer(std::move(buffer)); + auto byte_buffer = GoogleGrpcUtils::makeByteBuffer(std::move(buffer)); std::vector slices; - byteBuffer.Dump(&slices); + byte_buffer.Dump(&slices); std::string str; for (auto& s : slices) { str.append(std::string(reinterpret_cast(s.begin()), s.size())); @@ -55,9 +56,9 @@ TEST(GoogleGrpcUtilsTest, MakeByteBuffer3) { buffer->add("test", 4); buffer->add(" ", 1); buffer->add("this", 4); - auto byteBuffer = GoogleGrpcUtils::makeByteBuffer(std::move(buffer)); + auto byte_buffer = GoogleGrpcUtils::makeByteBuffer(std::move(buffer)); std::vector slices; - byteBuffer.Dump(&slices); + byte_buffer.Dump(&slices); std::string str; for (auto& s : slices) { str.append(std::string(reinterpret_cast(s.begin()), s.size())); @@ -67,12 +68,13 @@ TEST(GoogleGrpcUtilsTest, MakeByteBuffer3) { // Test building a Buffer::Instance from a grpc::ByteBuffer from a Bufffer::Instance with 3 slices. TEST(GoogleGrpcUtilsTest, ByteBufferInstanceRoundTrip) { - grpc::Slice slices[3] = {{"test"}, {" "}, {"this"}}; - grpc::ByteBuffer byteBuffer1(slices, 3); - auto bufferInstance1 = GoogleGrpcUtils::makeBufferInstance(byteBuffer1); - auto byteBuffer2 = GoogleGrpcUtils::makeByteBuffer(std::move(bufferInstance1)); - auto bufferInstance2 = GoogleGrpcUtils::makeBufferInstance(byteBuffer2); - EXPECT_EQ(bufferInstance2->toString(), "test this"); + std::array slices = {grpc::string("test"), grpc::string(" "), + grpc::string("this")}; + grpc::ByteBuffer byte_buffer(&slices[0], 3); + auto buffer_instance1 = GoogleGrpcUtils::makeBufferInstance(byte_buffer); + auto byte_buffer2 = GoogleGrpcUtils::makeByteBuffer(std::move(buffer_instance1)); + auto buffer_instance2 = GoogleGrpcUtils::makeBufferInstance(byte_buffer2); + EXPECT_EQ(buffer_instance2->toString(), "test this"); } } // namespace Grpc diff --git a/test/test_common/environment.cc b/test/test_common/environment.cc index 20eb2b598a90f..d3a40f94eb29d 100644 --- a/test/test_common/environment.cc +++ b/test/test_common/environment.cc @@ -148,8 +148,6 @@ bool TestEnvironment::shouldRunTestForIpVersion(Network::Address::IpVersion type std::vector TestEnvironment::getIpVersionsForTest() { std::vector parameters; - parameters.push_back(Network::Address::IpVersion::v4); -#if 0 for (auto version : {Network::Address::IpVersion::v4, Network::Address::IpVersion::v6}) { if (TestEnvironment::shouldRunTestForIpVersion(version)) { parameters.push_back(version); @@ -161,7 +159,6 @@ std::vector TestEnvironment::getIpVersionsForTest() } } } -#endif return parameters; } From deb27a3a1b79feb03bffc2cd8bbfae45818f5ddf Mon Sep 17 00:00:00 2001 From: John Plevyak Date: Tue, 7 May 2019 10:20:51 -0700 Subject: [PATCH 11/17] Fix release build. Signed-off-by: John Plevyak --- source/common/grpc/google_grpc_utils.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/common/grpc/google_grpc_utils.cc b/source/common/grpc/google_grpc_utils.cc index 060ce024bd6f9..d2a9b60c36da8 100644 --- a/source/common/grpc/google_grpc_utils.cc +++ b/source/common/grpc/google_grpc_utils.cc @@ -92,9 +92,9 @@ Buffer::InstancePtr GoogleGrpcUtils::makeBufferInstance(const grpc::ByteBuffer& }; // NB: addBufferFragment takes a pointer alias to the BufferFragmentImpl which is passed in so we // need to ensure that the lifetime of those objects exceeds that of the Buffer::Instance. - ASSERT(!::posix_memalign(reinterpret_cast(&container->fragments_), - alignof(Buffer::BufferFragmentImpl), - sizeof(Buffer::BufferFragmentImpl) * slices.size())); + ::posix_memalign(reinterpret_cast(&container->fragments_), + alignof(Buffer::BufferFragmentImpl), + sizeof(Buffer::BufferFragmentImpl) * slices.size()); for (size_t i = 0; i < slices.size(); i++) { new (&container->fragments_[i]) Buffer::BufferFragmentImpl(slices[i].begin(), slices[i].size(), releaser); From 8f398ef5909382103971d73cf14d2e4c09f8f753 Mon Sep 17 00:00:00 2001 From: John Plevyak Date: Tue, 7 May 2019 11:40:47 -0700 Subject: [PATCH 12/17] Do not ignore the return value of posix_memalign. Signed-off-by: John Plevyak --- source/common/grpc/google_grpc_utils.cc | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/source/common/grpc/google_grpc_utils.cc b/source/common/grpc/google_grpc_utils.cc index d2a9b60c36da8..3589b403e9c09 100644 --- a/source/common/grpc/google_grpc_utils.cc +++ b/source/common/grpc/google_grpc_utils.cc @@ -92,9 +92,10 @@ Buffer::InstancePtr GoogleGrpcUtils::makeBufferInstance(const grpc::ByteBuffer& }; // NB: addBufferFragment takes a pointer alias to the BufferFragmentImpl which is passed in so we // need to ensure that the lifetime of those objects exceeds that of the Buffer::Instance. - ::posix_memalign(reinterpret_cast(&container->fragments_), - alignof(Buffer::BufferFragmentImpl), - sizeof(Buffer::BufferFragmentImpl) * slices.size()); + RELEASE_ASSERT(!::posix_memalign(reinterpret_cast(&container->fragments_), + alignof(Buffer::BufferFragmentImpl), + sizeof(Buffer::BufferFragmentImpl) * slices.size()), + "posix_memalign failure"); for (size_t i = 0; i < slices.size(); i++) { new (&container->fragments_[i]) Buffer::BufferFragmentImpl(slices[i].begin(), slices[i].size(), releaser); From 406bd50e6267cc7e34ee953ec8a2f92348a2e6d5 Mon Sep 17 00:00:00 2001 From: John Plevyak Date: Wed, 8 May 2019 17:20:46 -0700 Subject: [PATCH 13/17] Remove unnecessary dependency. Signed-off-by: John Plevyak --- source/common/grpc/BUILD | 6 +----- source/common/grpc/common.h | 1 - 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/source/common/grpc/BUILD b/source/common/grpc/BUILD index 08f8747ab7228..069f5d9af5690 100644 --- a/source/common/grpc/BUILD +++ b/source/common/grpc/BUILD @@ -62,10 +62,7 @@ envoy_cc_library( name = "common_lib", srcs = ["common.cc"], hdrs = ["common.h"], - external_deps = [ - "abseil_optional", - "grpc", - ], + external_deps = ["abseil_optional"], deps = [ "//include/envoy/http:header_map_interface", "//include/envoy/http:message_interface", @@ -114,7 +111,6 @@ envoy_cc_library( "grpc", ], deps = [ - ":async_client_lib", ":common_lib", ":google_grpc_creds_lib", "//include/envoy/api:api_interface", diff --git a/source/common/grpc/common.h b/source/common/grpc/common.h index 1493a3a4241e7..87b2cc01aa44c 100644 --- a/source/common/grpc/common.h +++ b/source/common/grpc/common.h @@ -13,7 +13,6 @@ #include "common/protobuf/protobuf.h" #include "absl/types/optional.h" -#include "grpcpp/grpcpp.h" namespace Envoy { namespace Grpc { From 377de24d546d08199c326133cda4b23d2a0b510e Mon Sep 17 00:00:00 2001 From: John Plevyak Date: Wed, 8 May 2019 17:54:57 -0700 Subject: [PATCH 14/17] Address comments by adding comments. Signed-off-by: John Plevyak --- source/common/grpc/common.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/common/grpc/common.cc b/source/common/grpc/common.cc index fe94fe7a57edc..2021f34654935 100644 --- a/source/common/grpc/common.cc +++ b/source/common/grpc/common.cc @@ -118,6 +118,8 @@ bool Common::resolveServiceAndMethod(const Http::HeaderEntry* path, std::string* Buffer::InstancePtr Common::serializeToGrpcFrame(const Protobuf::Message& message) { // http://www.grpc.io/docs/guides/wire.html // Reserve enough space for the entire message and the 5 byte header. + // NB: we do not use prependGrpcFrameHeader because that would add another BufferFragment and this + // (using a single BufferFragment) is more efficient. Buffer::InstancePtr body(new Buffer::OwnedImpl()); const uint32_t size = message.ByteSize(); const uint32_t alloc_size = size + 5; From 8a9b7f4d282ac9837e48dae375b3e06accb21d54 Mon Sep 17 00:00:00 2001 From: John Plevyak Date: Wed, 8 May 2019 17:57:47 -0700 Subject: [PATCH 15/17] Address comment. Signed-off-by: John Plevyak --- source/common/grpc/google_grpc_utils.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/common/grpc/google_grpc_utils.cc b/source/common/grpc/google_grpc_utils.cc index 3589b403e9c09..35c4cc6d2d6d9 100644 --- a/source/common/grpc/google_grpc_utils.cc +++ b/source/common/grpc/google_grpc_utils.cc @@ -45,7 +45,7 @@ grpc::ByteBuffer GoogleGrpcUtils::makeByteBuffer(Buffer::InstancePtr&& buffer_in if (n_slices <= 0) { return {}; } - auto container = new BufferInstanceContainer{n_slices, std::move(buffer_instance)}; + auto* container = new BufferInstanceContainer{n_slices, std::move(buffer_instance)}; if (n_slices == 1) { grpc::Slice one_slice(on_raw_slice.mem_, on_raw_slice.len_, &BufferInstanceContainer::derefBufferInstanceContainer, container); From 70ecea2daccc9092079584b8f673f9a4bdac5e37 Mon Sep 17 00:00:00 2001 From: John Plevyak Date: Wed, 8 May 2019 18:48:15 -0700 Subject: [PATCH 16/17] Address comments. Signed-off-by: John Plevyak --- source/common/grpc/common.h | 4 ++-- source/common/grpc/google_grpc_utils.cc | 2 +- source/common/grpc/google_grpc_utils.h | 11 ++++++----- test/common/grpc/google_grpc_utils_test.cc | 13 ++++++++++--- 4 files changed, 19 insertions(+), 11 deletions(-) diff --git a/source/common/grpc/common.h b/source/common/grpc/common.h index 87b2cc01aa44c..acec644e9e125 100644 --- a/source/common/grpc/common.h +++ b/source/common/grpc/common.h @@ -119,7 +119,7 @@ class Common { std::string* method); /** - * Serialize protobuf message. With grpc header. + * Serialize protobuf message with gRPC frame header. */ static Buffer::InstancePtr serializeToGrpcFrame(const Protobuf::Message& message); @@ -155,7 +155,7 @@ class Common { /** * Prepend a gRPC frame header to a Buffer::Instance containing a single gRPC frame. - * @param bufferInstance containing the frame data which will be modified. + * @param buffer containing the frame data which will be modified. */ static void prependGrpcFrameHeader(Buffer::Instance& buffer); diff --git a/source/common/grpc/google_grpc_utils.cc b/source/common/grpc/google_grpc_utils.cc index 35c4cc6d2d6d9..d1b302386e234 100644 --- a/source/common/grpc/google_grpc_utils.cc +++ b/source/common/grpc/google_grpc_utils.cc @@ -26,7 +26,7 @@ struct BufferInstanceContainer { Buffer::InstancePtr buffer_; static void derefBufferInstanceContainer(void* container_ptr) { - auto container = reinterpret_cast(container_ptr); + auto container = static_cast(container_ptr); container->ref_count_--; // This is safe because the ref_count_ is never incremented. if (container->ref_count_ <= 0) { diff --git a/source/common/grpc/google_grpc_utils.h b/source/common/grpc/google_grpc_utils.h index 5d1f9d63d83ef..86975ea4f6bd3 100644 --- a/source/common/grpc/google_grpc_utils.h +++ b/source/common/grpc/google_grpc_utils.h @@ -14,18 +14,19 @@ class GoogleGrpcUtils { public: /** * Build grpc::ByteBuffer which aliases the data in a Buffer::InstancePtr. - * @param bufferInstance source data container. + * @param buffer source data container. * @return byteBuffer target container aliased to the data in Buffer::Instance and owning the * Buffer::Instance. */ - static grpc::ByteBuffer makeByteBuffer(Buffer::InstancePtr&& bufferInstance); + static grpc::ByteBuffer makeByteBuffer(Buffer::InstancePtr&& buffer); /** * Build Buffer::Instance which aliases the data in a grpc::ByteBuffer. - * @param byteBuffer source data container. - * @param Buffer::InstancePtr target container aliased to the data in grpc::ByteBuffer. + * @param buffer source data container. + * @return a Buffer::InstancePtr aliased to the data in the provided grpc::ByteBuffer and + * owning the corresponding grpc::Slice(s). */ - static Buffer::InstancePtr makeBufferInstance(const grpc::ByteBuffer& byteBuffer); + static Buffer::InstancePtr makeBufferInstance(const grpc::ByteBuffer& buffer); }; } // namespace Grpc diff --git a/test/common/grpc/google_grpc_utils_test.cc b/test/common/grpc/google_grpc_utils_test.cc index 584e472c97ef8..6f7a96a497c50 100644 --- a/test/common/grpc/google_grpc_utils_test.cc +++ b/test/common/grpc/google_grpc_utils_test.cc @@ -10,6 +10,7 @@ namespace Envoy { namespace Grpc { +namespace { TEST(GoogleGrpcUtilsTest, MakeBufferInstanceEmpty) { grpc::ByteBuffer byte_buffer; @@ -19,6 +20,8 @@ TEST(GoogleGrpcUtilsTest, MakeBufferInstanceEmpty) { TEST(GoogleGrpcUtilsTest, MakeByteBufferEmpty) { auto buffer = std::make_unique(); GoogleGrpcUtils::makeByteBuffer(std::move(buffer)); + buffer = nullptr; + GoogleGrpcUtils::makeByteBuffer(std::move(buffer)); } TEST(GoogleGrpcUtilsTest, MakeBufferInstance1) { @@ -53,9 +56,12 @@ TEST(GoogleGrpcUtilsTest, MakeByteBuffer1) { // Test building a grpc::ByteBuffer from a Bufffer::Instance with 3 slices. TEST(GoogleGrpcUtilsTest, MakeByteBuffer3) { auto buffer = std::make_unique(); - buffer->add("test", 4); - buffer->add(" ", 1); - buffer->add("this", 4); + Buffer::BufferFragmentImpl f1("test", 4, nullptr); + buffer->addBufferFragment(f1); + Buffer::BufferFragmentImpl f2(" ", 1, nullptr); + buffer->addBufferFragment(f2); + Buffer::BufferFragmentImpl f3("this", 4, nullptr); + buffer->addBufferFragment(f3); auto byte_buffer = GoogleGrpcUtils::makeByteBuffer(std::move(buffer)); std::vector slices; byte_buffer.Dump(&slices); @@ -77,5 +83,6 @@ TEST(GoogleGrpcUtilsTest, ByteBufferInstanceRoundTrip) { EXPECT_EQ(buffer_instance2->toString(), "test this"); } +} // namespace } // namespace Grpc } // namespace Envoy From 1c87b72641931e6a5edc8ae1553eb063a1f12621 Mon Sep 17 00:00:00 2001 From: John Plevyak Date: Thu, 9 May 2019 10:36:55 -0700 Subject: [PATCH 17/17] Address comments. Signed-off-by: John Plevyak --- source/common/grpc/google_grpc_utils.cc | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/source/common/grpc/google_grpc_utils.cc b/source/common/grpc/google_grpc_utils.cc index d1b302386e234..82325780a337c 100644 --- a/source/common/grpc/google_grpc_utils.cc +++ b/source/common/grpc/google_grpc_utils.cc @@ -41,7 +41,7 @@ grpc::ByteBuffer GoogleGrpcUtils::makeByteBuffer(Buffer::InstancePtr&& buffer_in } Buffer::RawSlice on_raw_slice; // NB: we need to pass in >= 1 in order to get the real "n" (see Buffer::Instance for details). - int n_slices = buffer_instance->getRawSlices(&on_raw_slice, 1); + const int n_slices = buffer_instance->getRawSlices(&on_raw_slice, 1); if (n_slices <= 0) { return {}; } @@ -79,9 +79,6 @@ Buffer::InstancePtr GoogleGrpcUtils::makeBufferInstance(const grpc::ByteBuffer& // lifetime of the Slice(s) exceeds our Buffer::Instance. std::vector slices; byte_buffer.Dump(&slices); - if (slices.empty()) { - return buffer; - } auto* container = new ByteBufferContainer(static_cast(slices.size())); std::function releaser = [container](const void*, size_t, const Buffer::BufferFragmentImpl*) {