Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion include/envoy/http/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,19 @@ class StreamDecoderFilterCallbacks : public virtual StreamFilterCallbacks {
*/
virtual void addDecodedData(Buffer::Instance& data, bool streaming_filter) PURE;

/**
* Decode data directly to subsequent filters in the filter chain. This method is used in
* advanced cases in which a filter needs full control over how subsequent filters view data,
* and does not want to make use of HTTP connection manager buffering. Using this method allows
Comment thread
mattklein123 marked this conversation as resolved.
* a filter to buffer data (or not) and then periodically inject data to subsequent filters,
* indicating end_stream at an appropriate time. This can be used to implement rate limiting,
* periodic data emission, etc.
*
* This method should only be called outside of callback context. I.e., do not call this method
* from within a filter's decodeData() call.
*/
virtual void decodeData(Buffer::Instance& data, bool end_stream) PURE;

/**
* Adds decoded trailers. May only be called in decodeData when end_stream is set to true.
* If called in any other context, an assertion will be triggered.
Expand Down Expand Up @@ -474,6 +487,19 @@ class StreamEncoderFilterCallbacks : public virtual StreamFilterCallbacks {
*/
virtual void addEncodedData(Buffer::Instance& data, bool streaming_filter) PURE;

/**
* Encode data directly to subsequent filters in the filter chain. This method is used in
* advanced cases in which a filter needs full control over how subsequent filters view data,
* and does not want to make use of HTTP connection manager buffering. Using this method allows
* a filter to buffer data (or not) and then periodically inject data to subsequent filters,
* indicating end_stream at an appropriate time. This can be used to implement rate limiting,
* periodic data emission, etc.
*
* This method should only be called outside of callback context. I.e., do not call this method
* from within a filter's encodeData() call.
*/
virtual void encodeData(Buffer::Instance& data, bool end_stream) PURE;

/**
* Adds encoded trailers. May only be called in encodeData when end_stream is set to true.
* If called in any other context, an assertion will be triggered.
Expand Down Expand Up @@ -517,7 +543,7 @@ class StreamEncoderFilterCallbacks : public virtual StreamFilterCallbacks {
*/
class StreamEncoderFilter : public StreamFilterBase {
public:
/*
/**
* Called with 100-continue headers.
*
* This is not folded into encodeHeaders because most Envoy users and filters
Expand Down
1 change: 1 addition & 0 deletions source/common/http/async_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ class AsyncStreamImpl : public AsyncClient::Stream,
// filter which uses this function for buffering.
ASSERT(buffered_body_ != nullptr);
}
void decodeData(Buffer::Instance&, bool) override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }
const Buffer::Instance* decodingBuffer() override { return buffered_body_.get(); }
void modifyDecodingBuffer(std::function<void(Buffer::Instance&)>) override {
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
Expand Down
10 changes: 10 additions & 0 deletions source/common/http/conn_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1734,6 +1734,11 @@ void ConnectionManagerImpl::ActiveStreamDecoderFilter::addDecodedData(Buffer::In
parent_.addDecodedData(*this, data, streaming);
}

void ConnectionManagerImpl::ActiveStreamDecoderFilter::decodeData(Buffer::Instance& data,
bool end_stream) {
parent_.decodeData(this, data, end_stream);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this require some extra work on the StopAllIteration PR? If so, anything we can do here to reduce merge complexity?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this should cause any issues/conflicts, other than maybe wanting some more asserts on when this shouldn't be called. @soya3129 WDYT?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree some conditions/checks may be needed for StopAllIteration case. For example, checking if the current filter returns StopAllIteration for headers. Maybe add a more detailed comment about when injectDecodedDataToFilterChain() should be called? For example, injectDecodedDataToFilterChain() can be called only when iteration for data on this filter is not stopped(?).

What will happen if one subsequent filter stops all iteration for headers? Do we need to buffer the injected data somewhere in case buffer is required?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, injectDecodedDataToFilterChain() can be called only when iteration for data on this filter is not stopped(?).

It should still work if headers was stopped and even if the data was previously buffered. Do you still think the comments need updating with the new version?

What will happen if one subsequent filter stops all iteration for headers? Do we need to buffer the injected data somewhere in case buffer is required?

This should still work. If the next filter in the filter chain decides to buffer data it should work fine.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Got it. Thanks!

}

void ConnectionManagerImpl::ActiveStreamDecoderFilter::continueDecoding() { commonContinue(); }

void ConnectionManagerImpl::ActiveStreamDecoderFilter::encode100ContinueHeaders(
Expand Down Expand Up @@ -1850,6 +1855,11 @@ void ConnectionManagerImpl::ActiveStreamEncoderFilter::addEncodedData(Buffer::In
return parent_.addEncodedData(*this, data, streaming);
}

void ConnectionManagerImpl::ActiveStreamEncoderFilter::encodeData(Buffer::Instance& data,
bool end_stream) {
parent_.encodeData(this, data, end_stream);
}

HeaderMap& ConnectionManagerImpl::ActiveStreamEncoderFilter::addEncodedTrailers() {
return parent_.addEncodedTrailers();
}
Expand Down
2 changes: 2 additions & 0 deletions source/common/http/conn_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,

// Http::StreamDecoderFilterCallbacks
void addDecodedData(Buffer::Instance& data, bool streaming) override;
void decodeData(Buffer::Instance& data, bool end_stream) override;
HeaderMap& addDecodedTrailers() override;
void continueDecoding() override;
const Buffer::Instance* decodingBuffer() override {
Expand Down Expand Up @@ -253,6 +254,7 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,

// Http::StreamEncoderFilterCallbacks
void addEncodedData(Buffer::Instance& data, bool streaming) override;
void encodeData(Buffer::Instance& data, bool end_stream) override;
HeaderMap& addEncodedTrailers() override;
void onEncoderFilterAboveWriteBufferHighWatermark() override;
void onEncoderFilterBelowWriteBufferLowWatermark() override;
Expand Down
174 changes: 174 additions & 0 deletions test/common/http/conn_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3422,6 +3422,180 @@ TEST_F(HttpConnectionManagerImplTest, AddDataWithStopAndContinue) {
encoder_filters_[2]->callbacks_->continueEncoding();
}

// Use filter direct decode/encodeData() calls without trailers.
TEST_F(HttpConnectionManagerImplTest, FilterDirectDecodeEncodeDataNoTrailers) {
InSequence s;
setup(false, "");

EXPECT_CALL(*codec_, dispatch(_)).WillOnce(Invoke([&](Buffer::Instance&) -> void {
StreamDecoder* decoder = &conn_manager_->newStream(response_encoder_);
HeaderMapPtr headers{
new TestHeaderMapImpl{{":authority", "host"}, {":path", "/"}, {":method", "GET"}}};
decoder->decodeHeaders(std::move(headers), false);

Buffer::OwnedImpl fake_data("hello");
decoder->decodeData(fake_data, true);
}));

EXPECT_CALL(*route_config_provider_.route_config_, route(_, _));
setupFilterChain(2, 2);

EXPECT_CALL(*decoder_filters_[0], decodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::Continue));
EXPECT_CALL(*decoder_filters_[1], decodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::StopIteration));

Buffer::OwnedImpl decode_buffer;
EXPECT_CALL(*decoder_filters_[0], decodeData(_, true))
.WillOnce(Invoke([&](Buffer::Instance& data, bool) {
decode_buffer.move(data);
return FilterDataStatus::StopIterationNoBuffer;
}));
EXPECT_CALL(*decoder_filters_[0], decodeComplete());

// Kick off the incoming data.
Buffer::OwnedImpl fake_input("1234");
conn_manager_->onData(fake_input, false);

Buffer::OwnedImpl decoded_data_to_forward;
decoded_data_to_forward.move(decode_buffer, 2);
EXPECT_CALL(*decoder_filters_[1], decodeData(BufferStringEqual("he"), false))
.WillOnce(Return(FilterDataStatus::StopIterationNoBuffer));
decoder_filters_[0]->callbacks_->decodeData(decoded_data_to_forward, false);

EXPECT_CALL(*decoder_filters_[1], decodeData(BufferStringEqual("llo"), true))
.WillOnce(Return(FilterDataStatus::StopIterationNoBuffer));
EXPECT_CALL(*decoder_filters_[1], decodeComplete());
decoder_filters_[0]->callbacks_->decodeData(decode_buffer, true);

// Response path.
EXPECT_CALL(*encoder_filters_[1], encodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::Continue));
EXPECT_CALL(*encoder_filters_[0], encodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::Continue));
EXPECT_CALL(response_encoder_, encodeHeaders(_, false));

Buffer::OwnedImpl encoder_buffer;
EXPECT_CALL(*encoder_filters_[1], encodeData(_, true))
.WillOnce(Invoke([&](Buffer::Instance& data, bool) {
encoder_buffer.move(data);
return FilterDataStatus::StopIterationNoBuffer;
}));
EXPECT_CALL(*encoder_filters_[1], encodeComplete());

decoder_filters_[1]->callbacks_->encodeHeaders(
HeaderMapPtr{new TestHeaderMapImpl{{":status", "200"}}}, false);
Buffer::OwnedImpl response_body("response");
decoder_filters_[1]->callbacks_->encodeData(response_body, true);

Buffer::OwnedImpl encoded_data_to_forward;
encoded_data_to_forward.move(encoder_buffer, 3);
EXPECT_CALL(*encoder_filters_[0], encodeData(BufferStringEqual("res"), false));
EXPECT_CALL(response_encoder_, encodeData(_, false));
encoder_filters_[1]->callbacks_->encodeData(encoded_data_to_forward, false);

EXPECT_CALL(*encoder_filters_[0], encodeData(BufferStringEqual("ponse"), true));
EXPECT_CALL(*encoder_filters_[0], encodeComplete());
EXPECT_CALL(response_encoder_, encodeData(_, true));
expectOnDestroy();
encoder_filters_[1]->callbacks_->encodeData(encoder_buffer, true);
}

// Use filter direct decode/encodeData() calls with trailers.
TEST_F(HttpConnectionManagerImplTest, FilterDirectDecodeEncodeDataTrailers) {
InSequence s;
setup(false, "");

EXPECT_CALL(*codec_, dispatch(_)).WillOnce(Invoke([&](Buffer::Instance&) -> void {
StreamDecoder* decoder = &conn_manager_->newStream(response_encoder_);
HeaderMapPtr headers{
new TestHeaderMapImpl{{":authority", "host"}, {":path", "/"}, {":method", "GET"}}};
decoder->decodeHeaders(std::move(headers), false);

Buffer::OwnedImpl fake_data("hello");
decoder->decodeData(fake_data, false);

HeaderMapPtr trailers{new TestHeaderMapImpl{{"foo", "bar"}}};
decoder->decodeTrailers(std::move(trailers));
}));

EXPECT_CALL(*route_config_provider_.route_config_, route(_, _));
setupFilterChain(2, 2);

EXPECT_CALL(*decoder_filters_[0], decodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::Continue));
EXPECT_CALL(*decoder_filters_[1], decodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::StopIteration));

Buffer::OwnedImpl decode_buffer;
EXPECT_CALL(*decoder_filters_[0], decodeData(_, false))
.WillOnce(Invoke([&](Buffer::Instance& data, bool) {
decode_buffer.move(data);
return FilterDataStatus::StopIterationNoBuffer;
}));
EXPECT_CALL(*decoder_filters_[0], decodeTrailers(_))
.WillOnce(Return(FilterTrailersStatus::StopIteration));
EXPECT_CALL(*decoder_filters_[0], decodeComplete());

// Kick off the incoming data.
Buffer::OwnedImpl fake_input("1234");
conn_manager_->onData(fake_input, false);

Buffer::OwnedImpl decoded_data_to_forward;
decoded_data_to_forward.move(decode_buffer, 2);
EXPECT_CALL(*decoder_filters_[1], decodeData(BufferStringEqual("he"), false))
.WillOnce(Return(FilterDataStatus::StopIterationNoBuffer));
decoder_filters_[0]->callbacks_->decodeData(decoded_data_to_forward, false);

EXPECT_CALL(*decoder_filters_[1], decodeData(BufferStringEqual("llo"), false))
.WillOnce(Return(FilterDataStatus::StopIterationNoBuffer));
decoder_filters_[0]->callbacks_->decodeData(decode_buffer, false);

EXPECT_CALL(*decoder_filters_[1], decodeTrailers(_));
EXPECT_CALL(*decoder_filters_[1], decodeComplete());
decoder_filters_[0]->callbacks_->continueDecoding();

// Response path.
EXPECT_CALL(*encoder_filters_[1], encodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::Continue));
EXPECT_CALL(*encoder_filters_[0], encodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::Continue));
EXPECT_CALL(response_encoder_, encodeHeaders(_, false));

Buffer::OwnedImpl encoder_buffer;
EXPECT_CALL(*encoder_filters_[1], encodeData(_, false))
.WillOnce(Invoke([&](Buffer::Instance& data, bool) {
encoder_buffer.move(data);
return FilterDataStatus::StopIterationNoBuffer;
}));
EXPECT_CALL(*encoder_filters_[1], encodeTrailers(_))
.WillOnce(Return(FilterTrailersStatus::StopIteration));
EXPECT_CALL(*encoder_filters_[1], encodeComplete());

decoder_filters_[1]->callbacks_->encodeHeaders(
HeaderMapPtr{new TestHeaderMapImpl{{":status", "200"}}}, false);
Buffer::OwnedImpl response_body("response");
decoder_filters_[1]->callbacks_->encodeData(response_body, false);
decoder_filters_[1]->callbacks_->encodeTrailers(
HeaderMapPtr{new TestHeaderMapImpl{{":status", "200"}}});

Buffer::OwnedImpl encoded_data_to_forward;
encoded_data_to_forward.move(encoder_buffer, 3);
EXPECT_CALL(*encoder_filters_[0], encodeData(BufferStringEqual("res"), false));
EXPECT_CALL(response_encoder_, encodeData(_, false));
encoder_filters_[1]->callbacks_->encodeData(encoded_data_to_forward, false);

EXPECT_CALL(*encoder_filters_[0], encodeData(BufferStringEqual("ponse"), false));
EXPECT_CALL(response_encoder_, encodeData(_, false));
encoder_filters_[1]->callbacks_->encodeData(encoder_buffer, false);

EXPECT_CALL(*encoder_filters_[0], encodeTrailers(_));
EXPECT_CALL(*encoder_filters_[0], encodeComplete());
EXPECT_CALL(response_encoder_, encodeTrailers(_));
expectOnDestroy();
encoder_filters_[1]->callbacks_->continueEncoding();
}

TEST_F(HttpConnectionManagerImplTest, MultipleFilters) {
InSequence s;
setup(false, "");
Expand Down
2 changes: 2 additions & 0 deletions test/mocks/http/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ class MockStreamDecoderFilterCallbacks : public StreamDecoderFilterCallbacks,

MOCK_METHOD0(continueDecoding, void());
MOCK_METHOD2(addDecodedData, void(Buffer::Instance& data, bool streaming));
MOCK_METHOD2(decodeData, void(Buffer::Instance& data, bool end_stream));
MOCK_METHOD0(addDecodedTrailers, HeaderMap&());
MOCK_METHOD0(decodingBuffer, const Buffer::Instance*());
MOCK_METHOD1(modifyDecodingBuffer, void(std::function<void(Buffer::Instance&)>));
Expand Down Expand Up @@ -219,6 +220,7 @@ class MockStreamEncoderFilterCallbacks : public StreamEncoderFilterCallbacks,

// Http::StreamEncoderFilterCallbacks
MOCK_METHOD2(addEncodedData, void(Buffer::Instance& data, bool streaming));
MOCK_METHOD2(encodeData, void(Buffer::Instance& data, bool end_stream));
MOCK_METHOD0(addEncodedTrailers, HeaderMap&());
MOCK_METHOD0(continueEncoding, void());
MOCK_METHOD0(encodingBuffer, const Buffer::Instance*());
Expand Down