Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/root/intro/version_history.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ Version history
* admin: added support for displaying ip address subject alternate names in :ref:`certs<operations_admin_interface_certs>` end point.
* buffer: force copy when appending small slices to OwnedImpl buffer to avoid fragmentation.
* config: use type URL to select an extension whenever the config type URL (or its previous versions) uniquely identify a typed extension, see :ref:`extension configuration <config_overview_extension_configuration>`.
* grpc-json: added support for building HTTP request into
`google.api.HttpBody <https://github.com/googleapis/googleapis/blob/master/google/api/httpbody.proto>`_.
* config: added stat :ref:`update_time <config_cluster_manager_cds>`.
* datasource: added retry policy for remote async data source.
* dns: the STRICT_DNS cluster now only resolves to 0 hosts if DNS resolution successfully returns 0 hosts.
Expand Down
8 changes: 8 additions & 0 deletions source/common/grpc/codec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ void Encoder::newFrame(uint8_t flags, uint64_t length, std::array<uint8_t, 5>& o
output[4] = static_cast<uint8_t>(length);
}

void Encoder::prependFrameHeader(uint8_t flags, Buffer::Instance& buffer) {
// Compute the size of the payload and construct the length prefix.
std::array<uint8_t, Grpc::GRPC_FRAME_HEADER_SIZE> frame;
Grpc::Encoder().newFrame(flags, buffer.length(), frame);
Buffer::OwnedImpl frame_buffer(frame.data(), frame.size());
buffer.prepend(frame_buffer);
}

bool Decoder::decode(Buffer::Instance& input, std::vector<Frame>& output) {
decoding_error_ = false;
output_ = &output;
Expand Down
5 changes: 5 additions & 0 deletions source/common/grpc/codec.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ class Encoder {
// @param length supplies the GRPC data frame length.
// @param output the buffer to store the encoded data. Its size must be 5.
void newFrame(uint8_t flags, uint64_t length, std::array<uint8_t, 5>& output);

// Prepend the gRPC frame into the buffer.
// @param flags supplies the GRPC data frame flags.
// @param buffer the buffer with the message payload.
void prependFrameHeader(uint8_t flags, Buffer::Instance& buffer);
};

// Wire format (http://www.grpc.io/docs/guides/wire.html) of GRPC data frame
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,18 +202,12 @@ Http::FilterTrailersStatus Filter::encodeTrailers(Http::ResponseTrailerMap& trai
}

void Filter::buildGrpcFrameHeader(Buffer::Instance& buffer) {
// Compute the size of the payload and construct the length prefix.
//
// We do this even if the upstream failed: If the response returned non-200,
// we'll respond with a grpc-status with an error, so clients will know that the request
// was unsuccessful. Since we're guaranteed at this point to have a valid response
// (unless upstream lied in content-type) we attempt to return a well-formed gRPC
// response body.
const auto length = buffer.length();
std::array<uint8_t, Grpc::GRPC_FRAME_HEADER_SIZE> frame;
Grpc::Encoder().newFrame(Grpc::GRPC_FH_DEFAULT, length, frame);
Buffer::OwnedImpl frame_buffer(frame.data(), frame.size());
buffer.prepend(frame_buffer);
Grpc::Encoder().prependFrameHeader(Grpc::GRPC_FH_DEFAULT, buffer);
}

} // namespace GrpcHttp1ReverseBridge
Expand Down
14 changes: 14 additions & 0 deletions source/extensions/filters/http/grpc_json_transcoder/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ envoy_cc_library(
"api_httpbody_protos",
],
deps = [
":http_body_utils_lib",
":transcoder_input_stream_lib",
"//include/envoy/http:filter_interface",
"//source/common/grpc:codec_lib",
Expand All @@ -33,6 +34,19 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "http_body_utils_lib",
srcs = ["http_body_utils.cc"],
hdrs = ["http_body_utils.h"],
external_deps = [
"api_httpbody_protos",
],
deps = [
"//source/common/grpc:codec_lib",
"//source/common/protobuf",
],
)

envoy_cc_library(
name = "transcoder_input_stream_lib",
srcs = ["transcoder_input_stream_impl.cc"],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
#include "extensions/filters/http/grpc_json_transcoder/http_body_utils.h"

#include "google/api/httpbody.pb.h"

using Envoy::Protobuf::io::CodedOutputStream;
using Envoy::Protobuf::io::StringOutputStream;

namespace Envoy {
namespace Extensions {
namespace HttpFilters {
namespace GrpcJsonTranscoder {

void HttpBodyUtils::appendHttpBodyEnvelope(
Buffer::Instance& output, const std::vector<const Protobuf::Field*>& request_body_field_path,
std::string content_type, uint64_t content_length) {
// Manually encode the protobuf envelope for the body.
// See https://developers.google.com/protocol-buffers/docs/encoding#embedded for wire format.

// Embedded messages are treated the same way as strings (wire type 2).
constexpr uint32_t ProtobufLengthDelimitedField = 2;

std::string proto_envelope;
{
// For memory safety, the StringOutputStream needs to be destroyed before
// we read the string.

const uint32_t http_body_field_number =
(google::api::HttpBody::kDataFieldNumber << 3) | ProtobufLengthDelimitedField;

::google::api::HttpBody body;
body.set_content_type(std::move(content_type));

uint64_t envelope_size = body.ByteSizeLong() +
CodedOutputStream::VarintSize32(http_body_field_number) +
CodedOutputStream::VarintSize64(content_length);
std::vector<uint32_t> message_sizes;
message_sizes.reserve(request_body_field_path.size());
for (auto it = request_body_field_path.rbegin(); it != request_body_field_path.rend(); ++it) {
const Protobuf::Field* field = *it;
const uint64_t message_size = envelope_size + content_length;
const uint32_t field_number = (field->number() << 3) | ProtobufLengthDelimitedField;
const uint64_t field_size = CodedOutputStream::VarintSize32(field_number) +
CodedOutputStream::VarintSize64(message_size);
message_sizes.push_back(message_size);
envelope_size += field_size;
}
std::reverse(message_sizes.begin(), message_sizes.end());

proto_envelope.reserve(envelope_size);

Envoy::Protobuf::io::StringOutputStream string_stream(&proto_envelope);
Envoy::Protobuf::io::CodedOutputStream coded_stream(&string_stream);

// Serialize body field definition manually to avoid the copy of the body.
for (size_t i = 0; i < request_body_field_path.size(); ++i) {
const Protobuf::Field* field = request_body_field_path[i];
const uint32_t field_number = (field->number() << 3) | ProtobufLengthDelimitedField;
const uint64_t message_size = message_sizes[i];
coded_stream.WriteTag(field_number);
coded_stream.WriteVarint64(message_size);
}
body.SerializeToCodedStream(&coded_stream);
coded_stream.WriteTag(http_body_field_number);
coded_stream.WriteVarint64(content_length);
}

output.add(proto_envelope);
}

} // namespace GrpcJsonTranscoder
} // namespace HttpFilters
} // namespace Extensions
} // namespace Envoy
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#pragma once

#include "envoy/buffer/buffer.h"

#include "common/buffer/buffer_impl.h"
#include "common/grpc/codec.h"
#include "common/protobuf/protobuf.h"

namespace Envoy {
namespace Extensions {
namespace HttpFilters {
namespace GrpcJsonTranscoder {

class HttpBodyUtils {
public:
static void
appendHttpBodyEnvelope(Buffer::Instance& output,
const std::vector<const Protobuf::Field*>& request_body_field_path,
std::string content_type, uint64_t content_length);
};

} // namespace GrpcJsonTranscoder
} // namespace HttpFilters
} // namespace Extensions
} // namespace Envoy
Loading