Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bazel/repositories.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def cc_grpc_httpjson_transcoding_dep():
native.git_repository(
name = "grpc_httpjson_transcoding",
remote = "https://github.com/grpc-ecosystem/grpc-httpjson-transcoding.git",
commit = "193aa283914ba701c12cfdfa5967c1c4210468e3",
commit = "3a90dfd2e7300e8dd60b74f0f4085f2a0bfc499e",
)

# Bazel native C++ dependencies. For the depedencies that doesn't provide autoconf/automake builds.
Expand Down
10 changes: 10 additions & 0 deletions source/common/buffer/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,13 @@ envoy_cc_library(
"//source/common/event:libevent_lib",
],
)

envoy_cc_library(
name = "zero_copy_input_stream_lib",
srcs = ["zero_copy_input_stream_impl.cc"],
hdrs = ["zero_copy_input_stream_impl.h"],
external_deps = ["protobuf"],
deps = [
":buffer_lib",
],
)
62 changes: 62 additions & 0 deletions source/common/buffer/zero_copy_input_stream_impl.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#include "common/buffer/zero_copy_input_stream_impl.h"

#include "common/buffer/buffer_impl.h"
#include "common/common/assert.h"

namespace Envoy {
namespace Buffer {

ZeroCopyInputStreamImpl::ZeroCopyInputStreamImpl() : buffer_(new Buffer::OwnedImpl) {}

ZeroCopyInputStreamImpl::ZeroCopyInputStreamImpl(Buffer::InstancePtr&& buffer)
: buffer_(std::move(buffer)) {
finish();
}

void ZeroCopyInputStreamImpl::move(Buffer::Instance& instance) {
ASSERT(!finished_);

buffer_->move(instance);
}

bool ZeroCopyInputStreamImpl::Next(const void** data, int* size) {
if (position_ != 0) {
buffer_->drain(position_);
position_ = 0;
}

Buffer::RawSlice slice;
const uint64_t num_slices = buffer_->getRawSlices(&slice, 1);

if (num_slices > 0 && slice.len_ > 0) {
*data = slice.mem_;
*size = slice.len_;
position_ = slice.len_;
byte_count_ += slice.len_;
return true;
}

if (!finished_) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little confused what happens here. If we return true here with no data, how does the proto code know to call Next() again without just going into a spin loop? Or is this only useful in the explicit transcoding case somehow? Again might need a few more comments for those not as familiar with proto interfaces.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is callers responsibility to maintain. Added comment. transcoding interface add BytesAvailable() to avoid this.

*data = nullptr;
*size = 0;
return true;
}
return false;
}

bool ZeroCopyInputStreamImpl::Skip(int) { NOT_IMPLEMENTED; }

void ZeroCopyInputStreamImpl::BackUp(int count) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little hazy on the proto buffer interfaces, but is this always safe? We are draining() above. Can we BackUp() into nothing? Or does that never happen. If so I would add a comment.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

The precondition of BackUp() doesn't allow them. so we're safe to draining. Added comments.

ASSERT(count > 0);
ASSERT(uint64_t(count) < position_);

// Preconditions for BackUp:
// - The last method called must have been Next().
// - count must be less than or equal to the size of the last buffer returned by Next().
// Due to preconditions above, it is safe to just adjust position_ and byte_count_ here, and
// drain in Next().
position_ -= count;
byte_count_ -= count;
}
}
}
52 changes: 52 additions & 0 deletions source/common/buffer/zero_copy_input_stream_impl.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#pragma once

#include <cstdint>
#include <string>

#include "envoy/buffer/buffer.h"

#include "google/protobuf/io/zero_copy_stream.h"

namespace Envoy {

namespace Buffer {

class ZeroCopyInputStreamImpl : public virtual google::protobuf::io::ZeroCopyInputStream {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is virtual needed here? Is this because google::protobuf::io::ZeroCopyInputStream isn't pure? Do have potential diamond inheritance with this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, TranscoderInputStreamImpl inherits this class and TranscoderInputStream from transcoding library, which inherits ZeroCopyInputStream.

public:
// Create input stream with one buffer, and finish immediately
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if there is any easy way to do this, but in this case, it would be nice if we didn't have to allocate a new buffer on the stack, just to move into, and then operate. IMO we should be able to operate directly on the input buffer since it's non-const anyway. I would just put in a TODO here for a potential perf optimization.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems all usage are using InstancePtr, so just changed the signature with InstancePtr in this class.

ZeroCopyInputStreamImpl(Buffer::InstancePtr&& buffer);

// Create input stream with empty buffer
ZeroCopyInputStreamImpl();

// Add a buffer to input stream, will consume all buffer from parameter
// if the stream is not finished
void move(Buffer::Instance& instance);

// Mark the stream is finished
void finish() { finished_ = true; }

// google::protobuf::io::ZeroCopyInputStream
// See
// https://developers.google.com/protocol-buffers/docs/reference/cpp/google.protobuf.io.zero_copy_stream#ZeroCopyInputStream
// for each method details.

// Note Next() will return true with no data until more data is available if the stream is not
// finished. It is the caller's responsibility to finish the stream or wrap with
// LimitingInputStream before passing to protobuf code to avoid a spin loop.
virtual bool Next(const void** data, int* size) override;
virtual void BackUp(int count) override;
virtual bool Skip(int count) override; // Not implemented
virtual google::protobuf::int64 ByteCount() const override { return byte_count_; }

protected:
Buffer::InstancePtr buffer_;
uint64_t position_{0};

private:
bool finished_{false};
uint64_t byte_count_{0};
};

} // namespace Buffer
} // namespace Envoy
12 changes: 12 additions & 0 deletions source/common/grpc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ envoy_cc_library(
":codec_lib",
":common_lib",
"//include/envoy/grpc:async_client_interface",
"//source/common/buffer:zero_copy_input_stream_lib",
"//source/common/http:async_client_lib",
],
)
Expand Down Expand Up @@ -94,6 +95,7 @@ envoy_cc_library(
":common_lib",
"//include/envoy/grpc:rpc_channel_interface",
"//include/envoy/upstream:cluster_manager_interface",
"//source/common/buffer:zero_copy_input_stream_lib",
"//source/common/common:assert_lib",
"//source/common/common:enum_to_int",
"//source/common/common:utility_lib",
Expand All @@ -112,3 +114,13 @@ envoy_cc_library(
"//include/envoy/config:subscription_interface",
],
)

envoy_cc_library(
name = "transcoder_input_stream_lib",
srcs = ["transcoder_input_stream_impl.cc"],
hdrs = ["transcoder_input_stream_impl.h"],
external_deps = ["grpc_transcoding"],
deps = [
"//source/common/buffer:zero_copy_input_stream_lib",
],
)
11 changes: 5 additions & 6 deletions source/common/grpc/async_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "envoy/grpc/async_client.h"

#include "common/buffer/zero_copy_input_stream_impl.h"
#include "common/common/enum_to_int.h"
#include "common/common/linked_object.h"
#include "common/grpc/codec.h"
Expand Down Expand Up @@ -110,14 +111,12 @@ class AsyncClientStreamImpl : public AsyncClientStream<RequestType>,
return;
}

for (const auto& frame : decoded_frames_) {
for (auto& frame : decoded_frames_) {
std::unique_ptr<ResponseType> response(new ResponseType());
// TODO(htuch): We can avoid linearizing the buffer here when Buffer::Instance implements
// protobuf ZeroCopyInputStream.
// TODO(htuch): Need to add support for compressed responses as well here.
if (frame.flags_ != GRPC_FH_DEFAULT ||
!response->ParseFromArray(frame.data_->linearize(frame.data_->length()),
frame.data_->length())) {
Buffer::ZeroCopyInputStreamImpl stream(std::move(frame.data_));

if (frame.flags_ != GRPC_FH_DEFAULT || !response->ParseFromZeroCopyStream(&stream)) {
streamError(Status::GrpcStatus::Internal);
return;
}
Expand Down
4 changes: 3 additions & 1 deletion source/common/grpc/rpc_channel_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <cstdint>
#include <string>

#include "common/buffer/zero_copy_input_stream_impl.h"
#include "common/common/enum_to_int.h"
#include "common/common/utility.h"
#include "common/grpc/common.h"
Expand Down Expand Up @@ -59,7 +60,8 @@ void RpcChannelImpl::onSuccess(Http::MessagePtr&& http_response) {
}

http_response->body()->drain(5);
if (!grpc_response_->ParseFromString(http_response->bodyAsString())) {
Buffer::ZeroCopyInputStreamImpl stream(std::move(http_response->body()));
if (!grpc_response_->ParseFromZeroCopyStream(&stream)) {
throw Exception(Optional<uint64_t>(), "bad serialized body");
}

Expand Down
9 changes: 9 additions & 0 deletions source/common/grpc/transcoder_input_stream_impl.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#include "common/grpc/transcoder_input_stream_impl.h"

namespace Envoy {
namespace Grpc {

int64_t TranscoderInputStreamImpl::BytesAvailable() const { return buffer_->length() - position_; }

} // namespace Grpc
} // namespace Envoy
18 changes: 18 additions & 0 deletions source/common/grpc/transcoder_input_stream_impl.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#pragma once

#include "common/buffer/zero_copy_input_stream_impl.h"

#include "grpc_transcoding/transcoder_input_stream.h"

namespace Envoy {
namespace Grpc {

class TranscoderInputStreamImpl : public Buffer::ZeroCopyInputStreamImpl,
public google::grpc::transcoding::TranscoderInputStream {
public:
// TranscoderInputStream
virtual int64_t BytesAvailable() const override;
};

} // namespace Grpc
} // namespace Envoy
17 changes: 17 additions & 0 deletions test/common/buffer/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
licenses(["notice"]) # Apache 2

load(
"//bazel:envoy_build_system.bzl",
"envoy_cc_test",
"envoy_package",
)

envoy_package()

envoy_cc_test(
name = "zero_copy_input_stream_test",
srcs = ["zero_copy_input_stream_test.cc"],
deps = [
"//source/common/buffer:zero_copy_input_stream_lib",
],
)
79 changes: 79 additions & 0 deletions test/common/buffer/zero_copy_input_stream_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
#include "common/buffer/buffer_impl.h"
#include "common/buffer/zero_copy_input_stream_impl.h"

#include "gtest/gtest.h"

namespace Envoy {
namespace Buffer {
namespace {

class ZeroCopyInputStreamTest : public testing::Test {
public:
ZeroCopyInputStreamTest() {
Buffer::OwnedImpl buffer{"abcd"};
stream_.move(buffer);
}

std::string slice_data_{"abcd"};
ZeroCopyInputStreamImpl stream_;

const void* data_;
int size_;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Prefer fixed size types in general in Envoy .

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

data_ and size_ are used to receive output from Next, so types here matches the interface.

};

TEST_F(ZeroCopyInputStreamTest, Move) {
Buffer::OwnedImpl buffer{"abcd"};
stream_.move(buffer);

EXPECT_EQ(0, buffer.length());
}

TEST_F(ZeroCopyInputStreamTest, Next) {
EXPECT_TRUE(stream_.Next(&data_, &size_));
EXPECT_EQ(4, size_);
EXPECT_EQ(0, memcmp(slice_data_.data(), data_, size_));
}

TEST_F(ZeroCopyInputStreamTest, TwoSlices) {
Buffer::OwnedImpl buffer("efgh");

stream_.move(buffer);

EXPECT_TRUE(stream_.Next(&data_, &size_));
EXPECT_EQ(4, size_);
EXPECT_EQ(0, memcmp(slice_data_.data(), data_, size_));
EXPECT_TRUE(stream_.Next(&data_, &size_));
EXPECT_EQ(4, size_);
EXPECT_EQ(0, memcmp("efgh", data_, size_));
}

TEST_F(ZeroCopyInputStreamTest, BackUp) {
EXPECT_TRUE(stream_.Next(&data_, &size_));
EXPECT_EQ(4, size_);

stream_.BackUp(3);
EXPECT_EQ(1, stream_.ByteCount());

EXPECT_TRUE(stream_.Next(&data_, &size_));
EXPECT_EQ(3, size_);
EXPECT_EQ(0, memcmp("bcd", data_, size_));
EXPECT_EQ(4, stream_.ByteCount());
}

TEST_F(ZeroCopyInputStreamTest, ByteCount) {
EXPECT_EQ(0, stream_.ByteCount());
EXPECT_TRUE(stream_.Next(&data_, &size_));
EXPECT_EQ(4, stream_.ByteCount());
}

TEST_F(ZeroCopyInputStreamTest, Finish) {
EXPECT_TRUE(stream_.Next(&data_, &size_));
EXPECT_TRUE(stream_.Next(&data_, &size_));
EXPECT_EQ(0, size_);
stream_.finish();
EXPECT_FALSE(stream_.Next(&data_, &size_));
}

} // namespace
} // namespace Buffer
} // namespace Envoy
9 changes: 9 additions & 0 deletions test/common/grpc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,12 @@ envoy_cc_test(
"//test/test_common:utility_lib",
],
)

envoy_cc_test(
name = "transcoder_input_stream_test",
srcs = ["transcoder_input_stream_test.cc"],
deps = [
"//source/common/buffer:buffer_lib",
"//source/common/grpc:transcoder_input_stream_lib",
],
)
Loading