diff --git a/bazel/repositories.bzl b/bazel/repositories.bzl index fa70fad50c1a7..d1b09b363b1d4 100644 --- a/bazel/repositories.bzl +++ b/bazel/repositories.bzl @@ -84,7 +84,7 @@ def cc_grpc_httpjson_transcoding_dep(): native.git_repository( name = "grpc_httpjson_transcoding", remote = "https://github.com/grpc-ecosystem/grpc-httpjson-transcoding.git", - commit = "193aa283914ba701c12cfdfa5967c1c4210468e3", + commit = "3a90dfd2e7300e8dd60b74f0f4085f2a0bfc499e", ) # Bazel native C++ dependencies. For the depedencies that doesn't provide autoconf/automake builds. diff --git a/source/common/buffer/BUILD b/source/common/buffer/BUILD index 8752531a62b83..e62353a04746c 100644 --- a/source/common/buffer/BUILD +++ b/source/common/buffer/BUILD @@ -17,3 +17,13 @@ envoy_cc_library( "//source/common/event:libevent_lib", ], ) + +envoy_cc_library( + name = "zero_copy_input_stream_lib", + srcs = ["zero_copy_input_stream_impl.cc"], + hdrs = ["zero_copy_input_stream_impl.h"], + external_deps = ["protobuf"], + deps = [ + ":buffer_lib", + ], +) diff --git a/source/common/buffer/zero_copy_input_stream_impl.cc b/source/common/buffer/zero_copy_input_stream_impl.cc new file mode 100644 index 0000000000000..b16504c022bb0 --- /dev/null +++ b/source/common/buffer/zero_copy_input_stream_impl.cc @@ -0,0 +1,62 @@ +#include "common/buffer/zero_copy_input_stream_impl.h" + +#include "common/buffer/buffer_impl.h" +#include "common/common/assert.h" + +namespace Envoy { +namespace Buffer { + +ZeroCopyInputStreamImpl::ZeroCopyInputStreamImpl() : buffer_(new Buffer::OwnedImpl) {} + +ZeroCopyInputStreamImpl::ZeroCopyInputStreamImpl(Buffer::InstancePtr&& buffer) + : buffer_(std::move(buffer)) { + finish(); +} + +void ZeroCopyInputStreamImpl::move(Buffer::Instance& instance) { + ASSERT(!finished_); + + buffer_->move(instance); +} + +bool ZeroCopyInputStreamImpl::Next(const void** data, int* size) { + if (position_ != 0) { + buffer_->drain(position_); + position_ = 0; + } + + Buffer::RawSlice slice; + const uint64_t num_slices = buffer_->getRawSlices(&slice, 1); + + if (num_slices > 0 && slice.len_ > 0) { + *data = slice.mem_; + *size = slice.len_; + position_ = slice.len_; + byte_count_ += slice.len_; + return true; + } + + if (!finished_) { + *data = nullptr; + *size = 0; + return true; + } + return false; +} + +bool ZeroCopyInputStreamImpl::Skip(int) { NOT_IMPLEMENTED; } + +void ZeroCopyInputStreamImpl::BackUp(int count) { + ASSERT(count > 0); + ASSERT(uint64_t(count) < position_); + + // Preconditions for BackUp: + // - The last method called must have been Next(). + // - count must be less than or equal to the size of the last buffer returned by Next(). + // Due to preconditions above, it is safe to just adjust position_ and byte_count_ here, and + // drain in Next(). + position_ -= count; + byte_count_ -= count; +} +} +} diff --git a/source/common/buffer/zero_copy_input_stream_impl.h b/source/common/buffer/zero_copy_input_stream_impl.h new file mode 100644 index 0000000000000..7d374aa673399 --- /dev/null +++ b/source/common/buffer/zero_copy_input_stream_impl.h @@ -0,0 +1,52 @@ +#pragma once + +#include +#include + +#include "envoy/buffer/buffer.h" + +#include "google/protobuf/io/zero_copy_stream.h" + +namespace Envoy { + +namespace Buffer { + +class ZeroCopyInputStreamImpl : public virtual google::protobuf::io::ZeroCopyInputStream { +public: + // Create input stream with one buffer, and finish immediately + ZeroCopyInputStreamImpl(Buffer::InstancePtr&& buffer); + + // Create input stream with empty buffer + ZeroCopyInputStreamImpl(); + + // Add a buffer to input stream, will consume all buffer from parameter + // if the stream is not finished + void move(Buffer::Instance& instance); + + // Mark the stream is finished + void finish() { finished_ = true; } + + // google::protobuf::io::ZeroCopyInputStream + // See + // https://developers.google.com/protocol-buffers/docs/reference/cpp/google.protobuf.io.zero_copy_stream#ZeroCopyInputStream + // for each method details. + + // Note Next() will return true with no data until more data is available if the stream is not + // finished. It is the caller's responsibility to finish the stream or wrap with + // LimitingInputStream before passing to protobuf code to avoid a spin loop. + virtual bool Next(const void** data, int* size) override; + virtual void BackUp(int count) override; + virtual bool Skip(int count) override; // Not implemented + virtual google::protobuf::int64 ByteCount() const override { return byte_count_; } + +protected: + Buffer::InstancePtr buffer_; + uint64_t position_{0}; + +private: + bool finished_{false}; + uint64_t byte_count_{0}; +}; + +} // namespace Buffer +} // namespace Envoy diff --git a/source/common/grpc/BUILD b/source/common/grpc/BUILD index 619b6c21dbeb0..29a1f9a1aff17 100644 --- a/source/common/grpc/BUILD +++ b/source/common/grpc/BUILD @@ -15,6 +15,7 @@ envoy_cc_library( ":codec_lib", ":common_lib", "//include/envoy/grpc:async_client_interface", + "//source/common/buffer:zero_copy_input_stream_lib", "//source/common/http:async_client_lib", ], ) @@ -94,6 +95,7 @@ envoy_cc_library( ":common_lib", "//include/envoy/grpc:rpc_channel_interface", "//include/envoy/upstream:cluster_manager_interface", + "//source/common/buffer:zero_copy_input_stream_lib", "//source/common/common:assert_lib", "//source/common/common:enum_to_int", "//source/common/common:utility_lib", @@ -112,3 +114,13 @@ envoy_cc_library( "//include/envoy/config:subscription_interface", ], ) + +envoy_cc_library( + name = "transcoder_input_stream_lib", + srcs = ["transcoder_input_stream_impl.cc"], + hdrs = ["transcoder_input_stream_impl.h"], + external_deps = ["grpc_transcoding"], + deps = [ + "//source/common/buffer:zero_copy_input_stream_lib", + ], +) diff --git a/source/common/grpc/async_client_impl.h b/source/common/grpc/async_client_impl.h index 4ccd29d5a417a..2cf431b4f4864 100644 --- a/source/common/grpc/async_client_impl.h +++ b/source/common/grpc/async_client_impl.h @@ -2,6 +2,7 @@ #include "envoy/grpc/async_client.h" +#include "common/buffer/zero_copy_input_stream_impl.h" #include "common/common/enum_to_int.h" #include "common/common/linked_object.h" #include "common/grpc/codec.h" @@ -110,14 +111,12 @@ class AsyncClientStreamImpl : public AsyncClientStream, return; } - for (const auto& frame : decoded_frames_) { + for (auto& frame : decoded_frames_) { std::unique_ptr response(new ResponseType()); - // TODO(htuch): We can avoid linearizing the buffer here when Buffer::Instance implements - // protobuf ZeroCopyInputStream. // TODO(htuch): Need to add support for compressed responses as well here. - if (frame.flags_ != GRPC_FH_DEFAULT || - !response->ParseFromArray(frame.data_->linearize(frame.data_->length()), - frame.data_->length())) { + Buffer::ZeroCopyInputStreamImpl stream(std::move(frame.data_)); + + if (frame.flags_ != GRPC_FH_DEFAULT || !response->ParseFromZeroCopyStream(&stream)) { streamError(Status::GrpcStatus::Internal); return; } diff --git a/source/common/grpc/rpc_channel_impl.cc b/source/common/grpc/rpc_channel_impl.cc index c53e5ea610479..fae7fae9a77a5 100644 --- a/source/common/grpc/rpc_channel_impl.cc +++ b/source/common/grpc/rpc_channel_impl.cc @@ -3,6 +3,7 @@ #include #include +#include "common/buffer/zero_copy_input_stream_impl.h" #include "common/common/enum_to_int.h" #include "common/common/utility.h" #include "common/grpc/common.h" @@ -59,7 +60,8 @@ void RpcChannelImpl::onSuccess(Http::MessagePtr&& http_response) { } http_response->body()->drain(5); - if (!grpc_response_->ParseFromString(http_response->bodyAsString())) { + Buffer::ZeroCopyInputStreamImpl stream(std::move(http_response->body())); + if (!grpc_response_->ParseFromZeroCopyStream(&stream)) { throw Exception(Optional(), "bad serialized body"); } diff --git a/source/common/grpc/transcoder_input_stream_impl.cc b/source/common/grpc/transcoder_input_stream_impl.cc new file mode 100644 index 0000000000000..39bb832dfb715 --- /dev/null +++ b/source/common/grpc/transcoder_input_stream_impl.cc @@ -0,0 +1,9 @@ +#include "common/grpc/transcoder_input_stream_impl.h" + +namespace Envoy { +namespace Grpc { + +int64_t TranscoderInputStreamImpl::BytesAvailable() const { return buffer_->length() - position_; } + +} // namespace Grpc +} // namespace Envoy diff --git a/source/common/grpc/transcoder_input_stream_impl.h b/source/common/grpc/transcoder_input_stream_impl.h new file mode 100644 index 0000000000000..7b8eafd8ef10a --- /dev/null +++ b/source/common/grpc/transcoder_input_stream_impl.h @@ -0,0 +1,18 @@ +#pragma once + +#include "common/buffer/zero_copy_input_stream_impl.h" + +#include "grpc_transcoding/transcoder_input_stream.h" + +namespace Envoy { +namespace Grpc { + +class TranscoderInputStreamImpl : public Buffer::ZeroCopyInputStreamImpl, + public google::grpc::transcoding::TranscoderInputStream { +public: + // TranscoderInputStream + virtual int64_t BytesAvailable() const override; +}; + +} // namespace Grpc +} // namespace Envoy diff --git a/test/common/buffer/BUILD b/test/common/buffer/BUILD new file mode 100644 index 0000000000000..aaedc4f077f72 --- /dev/null +++ b/test/common/buffer/BUILD @@ -0,0 +1,17 @@ +licenses(["notice"]) # Apache 2 + +load( + "//bazel:envoy_build_system.bzl", + "envoy_cc_test", + "envoy_package", +) + +envoy_package() + +envoy_cc_test( + name = "zero_copy_input_stream_test", + srcs = ["zero_copy_input_stream_test.cc"], + deps = [ + "//source/common/buffer:zero_copy_input_stream_lib", + ], +) diff --git a/test/common/buffer/zero_copy_input_stream_test.cc b/test/common/buffer/zero_copy_input_stream_test.cc new file mode 100644 index 0000000000000..a7780c2e0e95d --- /dev/null +++ b/test/common/buffer/zero_copy_input_stream_test.cc @@ -0,0 +1,79 @@ +#include "common/buffer/buffer_impl.h" +#include "common/buffer/zero_copy_input_stream_impl.h" + +#include "gtest/gtest.h" + +namespace Envoy { +namespace Buffer { +namespace { + +class ZeroCopyInputStreamTest : public testing::Test { +public: + ZeroCopyInputStreamTest() { + Buffer::OwnedImpl buffer{"abcd"}; + stream_.move(buffer); + } + + std::string slice_data_{"abcd"}; + ZeroCopyInputStreamImpl stream_; + + const void* data_; + int size_; +}; + +TEST_F(ZeroCopyInputStreamTest, Move) { + Buffer::OwnedImpl buffer{"abcd"}; + stream_.move(buffer); + + EXPECT_EQ(0, buffer.length()); +} + +TEST_F(ZeroCopyInputStreamTest, Next) { + EXPECT_TRUE(stream_.Next(&data_, &size_)); + EXPECT_EQ(4, size_); + EXPECT_EQ(0, memcmp(slice_data_.data(), data_, size_)); +} + +TEST_F(ZeroCopyInputStreamTest, TwoSlices) { + Buffer::OwnedImpl buffer("efgh"); + + stream_.move(buffer); + + EXPECT_TRUE(stream_.Next(&data_, &size_)); + EXPECT_EQ(4, size_); + EXPECT_EQ(0, memcmp(slice_data_.data(), data_, size_)); + EXPECT_TRUE(stream_.Next(&data_, &size_)); + EXPECT_EQ(4, size_); + EXPECT_EQ(0, memcmp("efgh", data_, size_)); +} + +TEST_F(ZeroCopyInputStreamTest, BackUp) { + EXPECT_TRUE(stream_.Next(&data_, &size_)); + EXPECT_EQ(4, size_); + + stream_.BackUp(3); + EXPECT_EQ(1, stream_.ByteCount()); + + EXPECT_TRUE(stream_.Next(&data_, &size_)); + EXPECT_EQ(3, size_); + EXPECT_EQ(0, memcmp("bcd", data_, size_)); + EXPECT_EQ(4, stream_.ByteCount()); +} + +TEST_F(ZeroCopyInputStreamTest, ByteCount) { + EXPECT_EQ(0, stream_.ByteCount()); + EXPECT_TRUE(stream_.Next(&data_, &size_)); + EXPECT_EQ(4, stream_.ByteCount()); +} + +TEST_F(ZeroCopyInputStreamTest, Finish) { + EXPECT_TRUE(stream_.Next(&data_, &size_)); + EXPECT_TRUE(stream_.Next(&data_, &size_)); + EXPECT_EQ(0, size_); + stream_.finish(); + EXPECT_FALSE(stream_.Next(&data_, &size_)); +} + +} // namespace +} // namespace Buffer +} // namespace Envoy diff --git a/test/common/grpc/BUILD b/test/common/grpc/BUILD index 87d9a155b2c27..1734329e998e9 100644 --- a/test/common/grpc/BUILD +++ b/test/common/grpc/BUILD @@ -95,3 +95,12 @@ envoy_cc_test( "//test/test_common:utility_lib", ], ) + +envoy_cc_test( + name = "transcoder_input_stream_test", + srcs = ["transcoder_input_stream_test.cc"], + deps = [ + "//source/common/buffer:buffer_lib", + "//source/common/grpc:transcoder_input_stream_lib", + ], +) diff --git a/test/common/grpc/transcoder_input_stream_test.cc b/test/common/grpc/transcoder_input_stream_test.cc new file mode 100644 index 0000000000000..dcfaf21d29e8e --- /dev/null +++ b/test/common/grpc/transcoder_input_stream_test.cc @@ -0,0 +1,48 @@ +#include "common/buffer/buffer_impl.h" +#include "common/grpc/transcoder_input_stream_impl.h" + +#include "gtest/gtest.h" + +namespace Envoy { +namespace Grpc { +namespace { + +class TranscoderInputStreamTest : public testing::Test { +public: + TranscoderInputStreamTest() { + Buffer::OwnedImpl buffer{"abcd"}; + stream_.move(buffer); + } + + std::string slice_data_{"abcd"}; + TranscoderInputStreamImpl stream_; + + const void* data_; + int size_; +}; + +TEST_F(TranscoderInputStreamTest, BytesAvailable) { + Buffer::OwnedImpl buffer{"abcd"}; + + stream_.move(buffer); + EXPECT_EQ(8, stream_.BytesAvailable()); +} + +TEST_F(TranscoderInputStreamTest, TwoSlices) { + Buffer::OwnedImpl buffer("efghi"); + + stream_.move(buffer); + EXPECT_EQ(9, stream_.BytesAvailable()); +} + +TEST_F(TranscoderInputStreamTest, BackUp) { + EXPECT_TRUE(stream_.Next(&data_, &size_)); + EXPECT_EQ(4, size_); + + stream_.BackUp(3); + EXPECT_EQ(3, stream_.BytesAvailable()); +} + +} // namespace +} // namespace Grpc +} // namespace Envoy