Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions source/common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ add_library(
filter/ratelimit.cc
filter/tcp_proxy.cc
generated/ratelimit.pb.cc
grpc/codec.cc
grpc/common.cc
grpc/http1_bridge_filter.cc
grpc/rpc_channel_impl.cc
Expand Down
89 changes: 89 additions & 0 deletions source/common/grpc/codec.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
#include "common/grpc/codec.h"

#include "common/buffer/buffer_impl.h"

namespace Grpc {

Encoder::Encoder() {}

void Encoder::newFrame(uint8_t flags, uint64_t length, std::array<uint8_t, 5>& output) {
output[0] = flags;
output[1] = static_cast<uint8_t>(length >> 24);
output[2] = static_cast<uint8_t>(length >> 16);
output[3] = static_cast<uint8_t>(length >> 8);
output[4] = static_cast<uint8_t>(length);
}

Decoder::Decoder() : state_(State::FH_FLAG) {}

bool Decoder::decode(Buffer::Instance& input, std::vector<Frame>& output) {
uint64_t count = input.getRawSlices(nullptr, 0);
Buffer::RawSlice slices[count];
input.getRawSlices(slices, count);
for (Buffer::RawSlice& slice : slices) {
uint8_t* mem = reinterpret_cast<uint8_t*>(slice.mem_);
for (uint64_t j = 0; j < slice.len_;) {
uint8_t c = *mem;
switch (state_) {
case State::FH_FLAG:
if (c & ~GRPC_FH_COMPRESSED) {
// Unsupported flags.
return false;
}
frame_.flags_ = c;
state_ = State::FH_LEN_0;
mem++;
j++;
break;
case State::FH_LEN_0:
frame_.length_ = static_cast<uint32_t>(c) << 24;
state_ = State::FH_LEN_1;
mem++;
j++;
break;
case State::FH_LEN_1:
frame_.length_ |= static_cast<uint32_t>(c) << 16;
state_ = State::FH_LEN_2;
mem++;
j++;
break;
case State::FH_LEN_2:
frame_.length_ |= static_cast<uint32_t>(c) << 8;
state_ = State::FH_LEN_3;
mem++;
j++;
break;
case State::FH_LEN_3:
frame_.length_ |= static_cast<uint32_t>(c);
frame_.data_.reset(new Buffer::OwnedImpl());
state_ = State::DATA;
mem++;
j++;
break;
case State::DATA:
uint64_t remain_in_buffer = slice.len_ - j;
uint64_t remain_in_frame = frame_.length_ - frame_.data_->length();
if (remain_in_buffer <= remain_in_frame) {
frame_.data_->add(mem, remain_in_buffer);
mem += remain_in_buffer;
j += remain_in_buffer;
} else {
frame_.data_->add(mem, remain_in_frame);
mem += remain_in_frame;
j += remain_in_frame;
}
if (frame_.length_ == frame_.data_->length()) {
output.push_back(std::move(frame_));
frame_.flags_ = 0;
frame_.length_ = 0;
state_ = State::FH_FLAG;
}
break;
}
}
}
input.drain(input.length());
return true;
}

} // namespace Grpc
79 changes: 79 additions & 0 deletions source/common/grpc/codec.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
#pragma once

#include "envoy/buffer/buffer.h"

namespace Grpc {
// Last bit for an expanded message without compression.
const uint8_t GRPC_FH_DEFAULT = 0b0u;
// Last bit for a compressed message.
const uint8_t GRPC_FH_COMPRESSED = 0b1u;

enum class CompressionAlgorithm { None, Gzip };

struct Frame {
uint8_t flags_;
uint32_t length_;
Buffer::InstancePtr data_;
};

class Encoder {
public:
Encoder();

// Creates a new GRPC data frame with the given flags and length.
// @param flags supplies the GRPC data frame flags.
// @param length supplies the GRPC data frame length.
// @param output the buffer to store the encoded data. Its size must be 5.
void newFrame(uint8_t flags, uint64_t length, std::array<uint8_t, 5>& output);
};

class Decoder {
public:
Decoder();

// Decodes the given buffer with GRPC data frame. Drains the input buffer when
// decoding succeeded (returns true). If the input is not sufficient to make a
// complete GRPC data frame, it will be buffered in the decoder. If a decoding
// error happened, the input buffer remains unchanged.
// @param input supplies the binary octets wrapped in a GRPC data frame.
// @param output supplies the buffer to store the decoded data.
// @return bool whether the decoding succeeded or not.
bool decode(Buffer::Instance& input, std::vector<Frame>& output);

private:
// Wire format (http://www.grpc.io/docs/guides/wire.html) of GRPC data frame
// header:
//
// -----------------------------------------------------------------------
// |R|R|R|R|R|R|R|R|C| L | L | L | L |
// -----------------------------------------------------------------------
// Flag (1 byte) Message Length (4 bytes)
//
// A fixed header consists of five bytes.
// The first byte is the Flag. The last one "C" bit indicates if the message
// is compressed or not (0 is uncompressed, 1 is compressed). The other seven
// "R" bits are reserved for future use.
// The next four "L" bytes represent the message length in BigEndian format.
enum class State {
// Waiting for decoding the flags (1 byte) of the GRPC data frame.
FH_FLAG,
// Waiting for decoding the 1st byte of the length (4 bytes in total) of the
// GRPC data frame.
FH_LEN_0,
// Waiting for decoding the 2nd byte of the length (4 bytes in total) of the
// GRPC data frame.
FH_LEN_1,
// Waiting for decoding the 3rd byte of the length (4 bytes in total) of the
// GRPC data frame.
FH_LEN_2,
// Waiting for decoding the 4th byte of the length (4 bytes in total) of the
// GRPC data frame.
FH_LEN_3,
// Waiting for decoding the data.
DATA,
};

State state_;
Frame frame_;
};
} // Grpc
1 change: 1 addition & 0 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ add_executable(envoy-test
common/event/file_event_impl_test.cc
common/filesystem/filesystem_impl_test.cc
common/filesystem/watcher_impl_test.cc
common/grpc/codec_test.cc
common/grpc/common_test.cc
common/grpc/http1_bridge_filter_test.cc
common/grpc/rpc_channel_impl_test.cc
Expand Down
147 changes: 147 additions & 0 deletions test/common/grpc/codec_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
#include "common/buffer/buffer_impl.h"
#include "common/grpc/codec.h"

#include "test/generated/helloworld.pb.h"

namespace Grpc {

TEST(GrpcCodecTest, encodeHeader) {
Encoder encoder;
std::array<uint8_t, 5> buffer;

encoder.newFrame(GRPC_FH_DEFAULT, 1, buffer);
EXPECT_EQ(buffer[0], GRPC_FH_DEFAULT);
EXPECT_EQ(buffer[1], 0);
EXPECT_EQ(buffer[2], 0);
EXPECT_EQ(buffer[3], 0);
EXPECT_EQ(buffer[4], 1);

encoder.newFrame(GRPC_FH_COMPRESSED, 1, buffer);
EXPECT_EQ(buffer[0], GRPC_FH_COMPRESSED);
EXPECT_EQ(buffer[1], 0);
EXPECT_EQ(buffer[2], 0);
EXPECT_EQ(buffer[3], 0);
EXPECT_EQ(buffer[4], 1);

encoder.newFrame(GRPC_FH_DEFAULT, 0x100, buffer);
EXPECT_EQ(buffer[0], GRPC_FH_DEFAULT);
EXPECT_EQ(buffer[1], 0);
EXPECT_EQ(buffer[2], 0);
EXPECT_EQ(buffer[3], 1);
EXPECT_EQ(buffer[4], 0);

encoder.newFrame(GRPC_FH_DEFAULT, 0x10000, buffer);
EXPECT_EQ(buffer[0], GRPC_FH_DEFAULT);
EXPECT_EQ(buffer[1], 0);
EXPECT_EQ(buffer[2], 1);
EXPECT_EQ(buffer[3], 0);
EXPECT_EQ(buffer[4], 0);

encoder.newFrame(GRPC_FH_DEFAULT, 0x1000000, buffer);
EXPECT_EQ(buffer[0], GRPC_FH_DEFAULT);
EXPECT_EQ(buffer[1], 1);
EXPECT_EQ(buffer[2], 0);
EXPECT_EQ(buffer[3], 0);
EXPECT_EQ(buffer[4], 0);
}

TEST(GrpcCodecTest, decodeIncompleteFrame) {
helloworld::HelloRequest request;
request.set_name("hello");
std::string request_buffer = request.SerializeAsString();

Buffer::OwnedImpl buffer;
std::array<uint8_t, 5> header;
Encoder encoder;
encoder.newFrame(GRPC_FH_DEFAULT, request.ByteSize(), header);
buffer.add(header.data(), 5);
buffer.add(request_buffer.c_str(), 5);

std::vector<Frame> frames;
Decoder decoder;
EXPECT_TRUE(decoder.decode(buffer, frames));
EXPECT_EQ(static_cast<size_t>(0), buffer.length());
EXPECT_EQ(static_cast<size_t>(0), frames.size());

buffer.add(request_buffer.c_str() + 5);
EXPECT_TRUE(decoder.decode(buffer, frames));
EXPECT_EQ(static_cast<size_t>(0), buffer.length());
EXPECT_EQ(static_cast<size_t>(1), frames.size());
helloworld::HelloRequest decoded_request;
EXPECT_TRUE(decoded_request.ParseFromArray(frames[0].data_->linearize(frames[0].data_->length()),
frames[0].data_->length()));
EXPECT_EQ("hello", decoded_request.name());
}

TEST(GrpcCodecTest, decodeInvalidFrame) {
helloworld::HelloRequest request;
request.set_name("hello");

Buffer::OwnedImpl buffer;
std::array<uint8_t, 5> header;
Encoder encoder;
encoder.newFrame(0b10u, request.ByteSize(), header);
buffer.add(header.data(), 5);
buffer.add(request.SerializeAsString());
size_t size = buffer.length();

std::vector<Frame> frames;
Decoder decoder;
EXPECT_FALSE(decoder.decode(buffer, frames));
EXPECT_EQ(size, buffer.length());
}

TEST(GrpcCodecTest, decodeSingleFrame) {
helloworld::HelloRequest request;
request.set_name("hello");

Buffer::OwnedImpl buffer;
std::array<uint8_t, 5> header;
Encoder encoder;
encoder.newFrame(GRPC_FH_DEFAULT, request.ByteSize(), header);
buffer.add(header.data(), 5);
buffer.add(request.SerializeAsString());

std::vector<Frame> frames;
Decoder decoder;
EXPECT_TRUE(decoder.decode(buffer, frames));
EXPECT_EQ(static_cast<size_t>(0), buffer.length());
EXPECT_EQ(frames.size(), static_cast<uint64_t>(1));
EXPECT_EQ(GRPC_FH_DEFAULT, frames[0].flags_);
EXPECT_EQ(static_cast<uint64_t>(request.ByteSize()), frames[0].length_);

helloworld::HelloRequest result;
result.ParseFromArray(frames[0].data_->linearize(frames[0].data_->length()),
frames[0].data_->length());
EXPECT_EQ("hello", result.name());
}

TEST(GrpcCodecTest, decodeMultipleFrame) {
helloworld::HelloRequest request;
request.set_name("hello");

Buffer::OwnedImpl buffer;
std::array<uint8_t, 5> header;
Encoder encoder;
encoder.newFrame(GRPC_FH_DEFAULT, request.ByteSize(), header);
for (int i = 0; i < 1009; i++) {
buffer.add(header.data(), 5);
buffer.add(request.SerializeAsString());
}

std::vector<Frame> frames;
Decoder decoder;
EXPECT_TRUE(decoder.decode(buffer, frames));
EXPECT_EQ(static_cast<size_t>(0), buffer.length());
EXPECT_EQ(frames.size(), static_cast<uint64_t>(1009));
for (Frame& frame : frames) {
EXPECT_EQ(GRPC_FH_DEFAULT, frame.flags_);
EXPECT_EQ(static_cast<uint64_t>(request.ByteSize()), frame.length_);

helloworld::HelloRequest result;
result.ParseFromArray(frame.data_->linearize(frame.data_->length()), frame.data_->length());
EXPECT_EQ("hello", result.name());
}
}

} // Grpc