Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions source/common/grpc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ envoy_cc_library(
hdrs = ["async_client_manager_impl.h"],
deps = [
":async_client_lib",
":common_lib",
"//include/envoy/grpc:async_client_manager_interface",
"//include/envoy/singleton:manager_interface",
"//include/envoy/thread_local:thread_local_interface",
Expand Down Expand Up @@ -82,6 +83,25 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "google_grpc_utils_lib",
srcs = ["google_grpc_utils.cc"],
hdrs = ["google_grpc_utils.h"],
external_deps = [
"abseil_optional",
"grpc",
],
deps = [
"//source/common/buffer:buffer_lib",
"//source/common/common:assert_lib",
"//source/common/common:empty_string",
"//source/common/common:enum_to_int",
"//source/common/common:macros",
"//source/common/common:utility_lib",
"//source/common/grpc:status_lib",
],
)

envoy_cc_library(
name = "google_async_client_lib",
srcs = ["google_async_client_impl.cc"],
Expand All @@ -91,6 +111,7 @@ envoy_cc_library(
"grpc",
],
deps = [
":common_lib",
":google_grpc_creds_lib",
"//include/envoy/api:api_interface",
"//include/envoy/grpc:google_grpc_creds_interface",
Expand Down
2 changes: 1 addition & 1 deletion source/common/grpc/async_client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ void AsyncStreamImpl::onReset() {
}

void AsyncStreamImpl::sendMessage(const Protobuf::Message& request, bool end_stream) {
stream_->sendData(*Common::serializeBody(request), end_stream);
stream_->sendData(*Common::serializeToGrpcFrame(request), end_stream);
}

void AsyncStreamImpl::closeStream() {
Expand Down
29 changes: 28 additions & 1 deletion source/common/grpc/common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <arpa/inet.h>

#include <atomic>
#include <cstdint>
#include <cstring>
#include <string>
Expand All @@ -12,6 +13,7 @@
#include "common/common/enum_to_int.h"
#include "common/common/fmt.h"
#include "common/common/macros.h"
#include "common/common/stack_array.h"
#include "common/common/utility.h"
#include "common/http/headers.h"
#include "common/http/message_impl.h"
Expand Down Expand Up @@ -112,9 +114,11 @@ bool Common::resolveServiceAndMethod(const Http::HeaderEntry* path, std::string*
return true;
}

Buffer::InstancePtr Common::serializeBody(const Protobuf::Message& message) {
Buffer::InstancePtr Common::serializeToGrpcFrame(const Protobuf::Message& message) {
// http://www.grpc.io/docs/guides/wire.html
// Reserve enough space for the entire message and the 5 byte header.
// NB: we do not use prependGrpcFrameHeader because that would add another BufferFragment and this
// (using a single BufferFragment) is more efficient.
Buffer::InstancePtr body(new Buffer::OwnedImpl());
const uint32_t size = message.ByteSize();
const uint32_t alloc_size = size + 5;
Expand All @@ -134,6 +138,21 @@ Buffer::InstancePtr Common::serializeBody(const Protobuf::Message& message) {
return body;
}

Buffer::InstancePtr Common::serializeMessage(const Protobuf::Message& message) {
auto body = std::make_unique<Buffer::OwnedImpl>();
const uint32_t size = message.ByteSize();
Buffer::RawSlice iovec;
body->reserve(size, &iovec, 1);
ASSERT(iovec.len_ >= size);
iovec.len_ = size;
uint8_t* current = reinterpret_cast<uint8_t*>(iovec.mem_);
Protobuf::io::ArrayOutputStream stream(current, size, -1);
Protobuf::io::CodedOutputStream codec_stream(&stream);
message.SerializeWithCachedSizes(&codec_stream);
body->commit(&iovec, 1);
return body;
}

std::chrono::milliseconds Common::getGrpcTimeout(Http::HeaderMap& request_headers) {
std::chrono::milliseconds timeout(0);
Http::HeaderEntry* header_grpc_timeout_entry = request_headers.GrpcTimeout();
Expand Down Expand Up @@ -267,5 +286,13 @@ std::string Common::typeUrl(const std::string& qualified_name) {
return typeUrlPrefix() + "/" + qualified_name;
}

void Common::prependGrpcFrameHeader(Buffer::Instance& buffer) {
std::array<char, 5> header;
header[0] = 0; // flags
const uint32_t nsize = htonl(buffer.length());
std::memcpy(&header[1], reinterpret_cast<const void*>(&nsize), sizeof(uint32_t));
buffer.prepend(absl::string_view(&header[0], 5));
}

} // namespace Grpc
} // namespace Envoy
15 changes: 13 additions & 2 deletions source/common/grpc/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,14 @@ class Common {
std::string* method);

/**
* Serialize protobuf message.
* Serialize protobuf message with gRPC frame header.
*/
static Buffer::InstancePtr serializeBody(const Protobuf::Message& message);
static Buffer::InstancePtr serializeToGrpcFrame(const Protobuf::Message& message);

/**
* Serialize protobuf message. Without grpc header.
*/
static Buffer::InstancePtr serializeMessage(const Protobuf::Message& message);

/**
* Prepare headers for protobuf service.
Expand All @@ -148,6 +153,12 @@ class Common {
*/
static std::string typeUrl(const std::string& qualified_name);

/**
* Prepend a gRPC frame header to a Buffer::Instance containing a single gRPC frame.
* @param buffer containing the frame data which will be modified.
*/
static void prependGrpcFrameHeader(Buffer::Instance& buffer);

private:
static void checkForHeaderOnlyError(Http::Message& http_response);
};
Expand Down
108 changes: 108 additions & 0 deletions source/common/grpc/google_grpc_utils.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
#include "common/grpc/google_grpc_utils.h"

#include <atomic>
#include <cstdint>
#include <cstring>
#include <string>

#include "common/buffer/buffer_impl.h"
#include "common/common/assert.h"
#include "common/common/empty_string.h"
#include "common/common/enum_to_int.h"
#include "common/common/fmt.h"
#include "common/common/macros.h"
#include "common/common/stack_array.h"
#include "common/common/utility.h"

#include "absl/strings/match.h"

namespace Envoy {
namespace Grpc {

struct BufferInstanceContainer {
BufferInstanceContainer(int ref_count, Buffer::InstancePtr&& buffer)
: ref_count_(ref_count), buffer_(std::move(buffer)) {}
std::atomic<uint32_t> ref_count_; // In case gPRC dereferences in a different threads.
Buffer::InstancePtr buffer_;

static void derefBufferInstanceContainer(void* container_ptr) {
auto container = static_cast<BufferInstanceContainer*>(container_ptr);
container->ref_count_--;
// This is safe because the ref_count_ is never incremented.
if (container->ref_count_ <= 0) {
delete container;
}
}
};

grpc::ByteBuffer GoogleGrpcUtils::makeByteBuffer(Buffer::InstancePtr&& buffer_instance) {
if (!buffer_instance) {
return {};
}
Buffer::RawSlice on_raw_slice;
// NB: we need to pass in >= 1 in order to get the real "n" (see Buffer::Instance for details).
const int n_slices = buffer_instance->getRawSlices(&on_raw_slice, 1);
if (n_slices <= 0) {
return {};
}
auto* container = new BufferInstanceContainer{n_slices, std::move(buffer_instance)};
if (n_slices == 1) {
grpc::Slice one_slice(on_raw_slice.mem_, on_raw_slice.len_,
&BufferInstanceContainer::derefBufferInstanceContainer, container);
return {&one_slice, 1};
}
STACK_ARRAY(many_raw_slices, Buffer::RawSlice, n_slices);
container->buffer_->getRawSlices(many_raw_slices.begin(), n_slices);
std::vector<grpc::Slice> slices;
slices.reserve(n_slices);
for (int i = 0; i < n_slices; i++) {
slices.emplace_back(many_raw_slices[i].mem_, many_raw_slices[i].len_,
&BufferInstanceContainer::derefBufferInstanceContainer, container);
}
return {&slices[0], slices.size()};
}

struct ByteBufferContainer {
ByteBufferContainer(int ref_count) : ref_count_(ref_count) {}
~ByteBufferContainer() { ::free(fragments_); }
uint32_t ref_count_;
Buffer::BufferFragmentImpl* fragments_ = nullptr;
std::vector<grpc::Slice> slices_;
};

Buffer::InstancePtr GoogleGrpcUtils::makeBufferInstance(const grpc::ByteBuffer& byte_buffer) {
auto buffer = std::make_unique<Buffer::OwnedImpl>();
if (byte_buffer.Length() == 0) {
return buffer;
}
// NB: ByteBuffer::Dump moves the data out of the ByteBuffer so we need to ensure that the
// lifetime of the Slice(s) exceeds our Buffer::Instance.
std::vector<grpc::Slice> slices;
byte_buffer.Dump(&slices);
auto* container = new ByteBufferContainer(static_cast<int>(slices.size()));
std::function<void(const void*, size_t, const Buffer::BufferFragmentImpl*)> releaser =
[container](const void*, size_t, const Buffer::BufferFragmentImpl*) {
container->ref_count_--;
if (container->ref_count_ <= 0) {
delete container;
}
};
// NB: addBufferFragment takes a pointer alias to the BufferFragmentImpl which is passed in so we
// need to ensure that the lifetime of those objects exceeds that of the Buffer::Instance.
RELEASE_ASSERT(!::posix_memalign(reinterpret_cast<void**>(&container->fragments_),
alignof(Buffer::BufferFragmentImpl),
sizeof(Buffer::BufferFragmentImpl) * slices.size()),
"posix_memalign failure");
for (size_t i = 0; i < slices.size(); i++) {
new (&container->fragments_[i])
Buffer::BufferFragmentImpl(slices[i].begin(), slices[i].size(), releaser);
}
for (size_t i = 0; i < slices.size(); i++) {
buffer->addBufferFragment(container->fragments_[i]);
}
container->slices_ = std::move(slices);
return buffer;
}

} // namespace Grpc
} // namespace Envoy
33 changes: 33 additions & 0 deletions source/common/grpc/google_grpc_utils.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#pragma once

#include <cstdint>
#include <string>

#include "envoy/buffer/buffer.h"

#include "grpcpp/grpcpp.h"

namespace Envoy {
namespace Grpc {

class GoogleGrpcUtils {
public:
/**
* Build grpc::ByteBuffer which aliases the data in a Buffer::InstancePtr.
* @param buffer source data container.
* @return byteBuffer target container aliased to the data in Buffer::Instance and owning the
* Buffer::Instance.
*/
static grpc::ByteBuffer makeByteBuffer(Buffer::InstancePtr&& buffer);

/**
* Build Buffer::Instance which aliases the data in a grpc::ByteBuffer.
* @param buffer source data container.
* @return a Buffer::InstancePtr aliased to the data in the provided grpc::ByteBuffer and
* owning the corresponding grpc::Slice(s).
*/
static Buffer::InstancePtr makeBufferInstance(const grpc::ByteBuffer& buffer);
};

} // namespace Grpc
} // namespace Envoy
2 changes: 1 addition & 1 deletion source/common/upstream/health_checker_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,7 @@ void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::onInterval() {
request.set_service(parent_.service_name_.value());
}

request_encoder_->encodeData(*Grpc::Common::serializeBody(request), true);
request_encoder_->encodeData(*Grpc::Common::serializeToGrpcFrame(request), true);
}

void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::onResetStream(Http::StreamResetReason,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ void LightStepDriver::LightStepTransporter::Send(const Protobuf::Message& reques
Http::MessagePtr message = Grpc::Common::prepareHeaders(
driver_.cluster()->name(), lightstep::CollectorServiceFullName(),
lightstep::CollectorMethodName(), absl::optional<std::chrono::milliseconds>(timeout));
message->body() = Grpc::Common::serializeBody(request);
message->body() = Grpc::Common::serializeToGrpcFrame(request);

active_request_ =
driver_.clusterManager()
Expand Down
12 changes: 12 additions & 0 deletions test/common/grpc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,18 @@ envoy_cc_test(
],
)

envoy_cc_test(
name = "google_grpc_utils_test",
srcs = envoy_select_google_grpc(["google_grpc_utils_test.cc"]),
deps = [
"//source/common/grpc:common_lib",
"//source/common/http:headers_lib",
"//test/mocks/upstream:upstream_mocks",
"//test/proto:helloworld_proto_cc",
"//test/test_common:utility_lib",
] + envoy_select_google_grpc(["//source/common/grpc:google_grpc_utils_lib"]),
)

envoy_cc_test(
name = "google_async_client_impl_test",
srcs = envoy_select_google_grpc(["google_async_client_impl_test.cc"]),
Expand Down
15 changes: 15 additions & 0 deletions test/common/grpc/common_test.cc
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#include <arpa/inet.h>

#include "common/grpc/common.h"
#include "common/http/headers.h"
#include "common/http/message_impl.h"
Expand Down Expand Up @@ -349,5 +351,18 @@ TEST(GrpcCommonTest, ValidateResponse) {
}
}

// Ensure that the correct gPRC header is constructed for a Buffer::Instance.
TEST(GrpcCommonTest, PrependGrpcFrameHeader) {
auto buffer = std::make_unique<Buffer::OwnedImpl>();
buffer->add("test", 4);
std::array<char, 5> expected_header;
expected_header[0] = 0; // flags
const uint32_t nsize = htonl(4);
std::memcpy(&expected_header[1], reinterpret_cast<const void*>(&nsize), sizeof(uint32_t));
std::string header_string(&expected_header[0], 5);
Common::prependGrpcFrameHeader(*buffer);
EXPECT_EQ(buffer->toString(), header_string + "test");
}

} // namespace Grpc
} // namespace Envoy
Loading