-
Notifications
You must be signed in to change notification settings - Fork 5.5k
Add codec to encode/decode GRPC data frame. #414
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 12 commits
4f52eb5
02c7880
bf3037c
d2e7195
9f4babe
b5a5019
62c7b05
549a9b4
e829410
cd47044
8965a8b
7d4bac2
876edf1
281f781
595abc6
fadd7a4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,88 @@ | ||
| #include "common/grpc/codec.h" | ||
|
|
||
| #include "common/buffer/buffer_impl.h" | ||
|
|
||
| namespace Grpc { | ||
|
|
||
| Encoder::Encoder() {} | ||
|
|
||
| void Encoder::newFrame(uint8_t flags, uint64_t length, std::array<uint8_t, 5>& output) { | ||
| output[0] = flags; | ||
| output[1] = static_cast<uint8_t>(length >> 24); | ||
| output[2] = static_cast<uint8_t>(length >> 16); | ||
| output[3] = static_cast<uint8_t>(length >> 8); | ||
| output[4] = static_cast<uint8_t>(length); | ||
| } | ||
|
|
||
| Decoder::Decoder() : state_(State::FH_FLAG) {} | ||
|
|
||
| bool Decoder::decode(Buffer::Instance& input, std::vector<Frame>& output) { | ||
| uint64_t count = input.getRawSlices(nullptr, 0); | ||
| Buffer::RawSlice slices[count]; | ||
| input.getRawSlices(slices, count); | ||
| for (Buffer::RawSlice& slice : slices) { | ||
| uint8_t* mem = reinterpret_cast<uint8_t*>(slice.mem_); | ||
| for (uint64_t j = 0; j < slice.len_;) { | ||
| uint8_t c = *mem; | ||
| switch (state_) { | ||
| case State::FH_FLAG: | ||
| if (c & ~GRPC_FH_COMPRESSED) { | ||
| // Unsupported flags. | ||
| return false; | ||
| } | ||
| frame_.flags_ = c; | ||
| state_ = State::FH_LEN_0; | ||
| mem++; | ||
| j++; | ||
| break; | ||
| case State::FH_LEN_0: | ||
| frame_.length_ = static_cast<uint32_t>(c) << 24; | ||
| state_ = State::FH_LEN_1; | ||
| mem++; | ||
| j++; | ||
| break; | ||
| case State::FH_LEN_1: | ||
| frame_.length_ |= static_cast<uint32_t>(c) << 16; | ||
| state_ = State::FH_LEN_2; | ||
| mem++; | ||
| j++; | ||
| break; | ||
| case State::FH_LEN_2: | ||
| frame_.length_ |= static_cast<uint32_t>(c) << 8; | ||
| state_ = State::FH_LEN_3; | ||
| mem++; | ||
| j++; | ||
| break; | ||
| case State::FH_LEN_3: | ||
| frame_.length_ |= static_cast<uint32_t>(c); | ||
| frame_.data_.reset(new Buffer::OwnedImpl()); | ||
| state_ = State::DATA; | ||
| mem++; | ||
| j++; | ||
| break; | ||
| case State::DATA: | ||
| uint64_t remain_in_buffer = slice.len_ - j; | ||
| uint64_t remain_in_frame = frame_.length_ - frame_.data_->length(); | ||
| if (remain_in_buffer <= remain_in_frame) { | ||
| frame_.data_->add(mem, remain_in_buffer); | ||
| mem += remain_in_buffer; | ||
| j += remain_in_buffer; | ||
| } else { | ||
| frame_.data_->add(mem, remain_in_frame); | ||
| mem += remain_in_frame; | ||
| j += remain_in_frame; | ||
| } | ||
| if (frame_.length_ == frame_.data_->length()) { | ||
| output.push_back(std::move(frame_)); | ||
| frame_.flags_ = 0; | ||
| frame_.length_ = 0; | ||
| state_ = State::FH_FLAG; | ||
| } | ||
| break; | ||
| } | ||
| } | ||
| } | ||
| return true; | ||
| } | ||
|
|
||
| } // namespace Grpc | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,76 @@ | ||
| #pragma once | ||
|
|
||
| #include "envoy/buffer/buffer.h" | ||
|
|
||
| namespace Grpc { | ||
| // Last bit for an expanded message without compression. | ||
| const uint8_t GRPC_FH_DEFAULT = 0b0u; | ||
| // Last bit for a compressed message. | ||
| const uint8_t GRPC_FH_COMPRESSED = 0b1u; | ||
|
|
||
| enum class CompressionAlgorithm { None, Gzip }; | ||
|
|
||
| struct Frame { | ||
| uint8_t flags_; | ||
| uint32_t length_; | ||
| Buffer::InstancePtr data_; | ||
| }; | ||
|
|
||
| class Encoder { | ||
| public: | ||
| Encoder(); | ||
|
|
||
| // Creates a new GRPC data frame with the given flags and length. | ||
| // @param flags supplies the GRPC data frame flags. | ||
| // @param length supplies the GRPC data frame length. | ||
| // @param output the buffer to store the encoded data. Its size must be 5. | ||
| void newFrame(uint8_t flags, uint64_t length, std::array<uint8_t, 5>& output); | ||
| }; | ||
|
|
||
| class Decoder { | ||
| public: | ||
| Decoder(); | ||
|
|
||
| // Decodes the given buffer with GRPC data frame. | ||
| // @param input supplies the binary octets wrapped in a GRPC data frame. | ||
| // @param output supplies the buffer to store the decoded data. | ||
| // @return bool whether the decoding succeeded or not. | ||
| bool decode(Buffer::Instance& input, std::vector<Frame>& output); | ||
|
|
||
| private: | ||
| // Wire format (http://www.grpc.io/docs/guides/wire.html) of GRPC data frame | ||
| // header: | ||
| // | ||
| // ----------------------------------------------------------------------- | ||
| // |R|R|R|R|R|R|R|R|C| L | L | L | L | | ||
| // ----------------------------------------------------------------------- | ||
| // Flag (1 byte) Message Length (4 bytes) | ||
| // | ||
| // A fixed header consists of five bytes. | ||
| // The first byte is the Flag. The last one "C" bit indicates if the message | ||
| // is compressed or not (0 is uncompressed, 1 is compressed). The other seven | ||
| // "R" bits are reserved for future use. | ||
| // The next four "L" bytes represent the message length in BigEndian format. | ||
| enum class State { | ||
| // Waiting for decoding the flags (1 byte) of the GRPC data frame. | ||
| FH_FLAG, | ||
| // Waiting for decoding the 1st byte of the length (4 bytes in total) of the | ||
| // GRPC data frame. | ||
| FH_LEN_0, | ||
| // Waiting for decoding the 2nd byte of the length (4 bytes in total) of the | ||
| // GRPC data frame. | ||
| FH_LEN_1, | ||
| // Waiting for decoding the 3rd byte of the length (4 bytes in total) of the | ||
| // GRPC data frame. | ||
| FH_LEN_2, | ||
| // Waiting for decoding the 4th byte of the length (4 bytes in total) of the | ||
| // GRPC data frame. | ||
| FH_LEN_3, | ||
| // Waiting for decoding the data. | ||
| DATA, | ||
| }; | ||
|
|
||
| State state_; | ||
| Frame frame_; | ||
| }; | ||
| } // Grpc |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,115 @@ | ||
| #include "common/buffer/buffer_impl.h" | ||
| #include "common/grpc/codec.h" | ||
|
|
||
| #include "test/generated/helloworld.pb.h" | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: newline before this line
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
|
|
||
| namespace Grpc { | ||
|
|
||
| TEST(CodecTest, encodeHeader) { | ||
| Encoder encoder; | ||
| std::array<uint8_t, 5> buffer; | ||
|
|
||
| encoder.newFrame(GRPC_FH_DEFAULT, 1, buffer); | ||
| EXPECT_EQ(buffer[0], GRPC_FH_DEFAULT); | ||
| EXPECT_EQ(buffer[1], 0); | ||
| EXPECT_EQ(buffer[2], 0); | ||
| EXPECT_EQ(buffer[3], 0); | ||
| EXPECT_EQ(buffer[4], 1); | ||
|
|
||
| encoder.newFrame(GRPC_FH_COMPRESSED, 1, buffer); | ||
| EXPECT_EQ(buffer[0], GRPC_FH_COMPRESSED); | ||
| EXPECT_EQ(buffer[1], 0); | ||
| EXPECT_EQ(buffer[2], 0); | ||
| EXPECT_EQ(buffer[3], 0); | ||
| EXPECT_EQ(buffer[4], 1); | ||
|
|
||
| encoder.newFrame(GRPC_FH_DEFAULT, 0x100, buffer); | ||
| EXPECT_EQ(buffer[0], GRPC_FH_DEFAULT); | ||
| EXPECT_EQ(buffer[1], 0); | ||
| EXPECT_EQ(buffer[2], 0); | ||
| EXPECT_EQ(buffer[3], 1); | ||
| EXPECT_EQ(buffer[4], 0); | ||
|
|
||
| encoder.newFrame(GRPC_FH_DEFAULT, 0x10000, buffer); | ||
| EXPECT_EQ(buffer[0], GRPC_FH_DEFAULT); | ||
| EXPECT_EQ(buffer[1], 0); | ||
| EXPECT_EQ(buffer[2], 1); | ||
| EXPECT_EQ(buffer[3], 0); | ||
| EXPECT_EQ(buffer[4], 0); | ||
|
|
||
| encoder.newFrame(GRPC_FH_DEFAULT, 0x1000000, buffer); | ||
| EXPECT_EQ(buffer[0], GRPC_FH_DEFAULT); | ||
| EXPECT_EQ(buffer[1], 1); | ||
| EXPECT_EQ(buffer[2], 0); | ||
| EXPECT_EQ(buffer[3], 0); | ||
| EXPECT_EQ(buffer[4], 0); | ||
| } | ||
|
|
||
| TEST(CodecTest, decodeInvalidFrame) { | ||
| helloworld::HelloRequest request; | ||
| request.set_name("hello"); | ||
|
|
||
| Buffer::OwnedImpl buffer; | ||
| std::array<uint8_t, 5> header; | ||
| Encoder encoder; | ||
| encoder.newFrame(0b10u, request.ByteSize(), header); | ||
| buffer.add(header.data(), 5); | ||
| buffer.add(request.SerializeAsString()); | ||
|
|
||
| std::vector<Frame> frames; | ||
| Decoder decoder; | ||
| EXPECT_FALSE(decoder.decode(buffer, frames)); | ||
| } | ||
|
|
||
| TEST(CodecTest, decodeSingleFrame) { | ||
| helloworld::HelloRequest request; | ||
| request.set_name("hello"); | ||
|
|
||
| Buffer::OwnedImpl buffer; | ||
| std::array<uint8_t, 5> header; | ||
| Encoder encoder; | ||
| encoder.newFrame(GRPC_FH_DEFAULT, request.ByteSize(), header); | ||
| buffer.add(header.data(), 5); | ||
| buffer.add(request.SerializeAsString()); | ||
|
|
||
| std::vector<Frame> frames; | ||
| Decoder decoder; | ||
| decoder.decode(buffer, frames); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. EXPECT_TRUE and in the case below
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
| EXPECT_EQ(frames.size(), static_cast<uint64_t>(1)); | ||
| EXPECT_EQ(GRPC_FH_DEFAULT, frames[0].flags_); | ||
| EXPECT_EQ(static_cast<uint64_t>(request.ByteSize()), frames[0].length_); | ||
|
|
||
| helloworld::HelloRequest result; | ||
| result.ParseFromArray(frames[0].data_->linearize(frames[0].data_->length()), | ||
| frames[0].data_->length()); | ||
| EXPECT_EQ("hello", result.name()); | ||
| } | ||
|
|
||
| TEST(CodecTest, decodeMultipleFrame) { | ||
| helloworld::HelloRequest request; | ||
| request.set_name("hello"); | ||
|
|
||
| Buffer::OwnedImpl buffer; | ||
| std::array<uint8_t, 5> header; | ||
| Encoder encoder; | ||
| encoder.newFrame(GRPC_FH_DEFAULT, request.ByteSize(), header); | ||
| for (int i = 0; i < 1009; i++) { | ||
| buffer.add(header.data(), 5); | ||
| buffer.add(request.SerializeAsString()); | ||
| } | ||
|
|
||
| std::vector<Frame> frames; | ||
| Decoder decoder; | ||
| decoder.decode(buffer, frames); | ||
| EXPECT_EQ(frames.size(), static_cast<uint64_t>(1009)); | ||
| for (Frame& frame : frames) { | ||
| EXPECT_EQ(GRPC_FH_DEFAULT, frame.flags_); | ||
| EXPECT_EQ(static_cast<uint64_t>(request.ByteSize()), frame.length_); | ||
|
|
||
| helloworld::HelloRequest result; | ||
| result.ParseFromArray(frame.data_->linearize(frame.data_->length()), frame.data_->length()); | ||
| EXPECT_EQ("hello", result.name()); | ||
| } | ||
| } | ||
|
|
||
| } // Grpc | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: newline before this line
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might want to check
state_to see if the input was a partial GRPC frame? (e.g. frame length = 100 but only 10 bytes in input buffer.Also add a test for that case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test added.
You don't need to check the state_. If grpc call get aborted when decoding, you should be able to know that from the HTTP2 layer, as the stream will be reset. Any intermediate state and buffer can be discarded. The workflow would be keep feeding data to the decoder as they arrive and check if there's one or more complete frame get decoded.