Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 55 additions & 1 deletion include/envoy/http/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,38 @@ class StreamDecoderFilterCallbacks : public virtual StreamFilterCallbacks {
*
* It is an error to call this method in any other case.
*
* See also injectDecodedDataToFilterChain() for a different way of passing data to further
* filters and also how the two methods are different.
*
* @param data Buffer::Instance supplies the data to be decoded.
* @param streaming_filter boolean supplies if this filter streams data or buffers the full body.
*/
virtual void addDecodedData(Buffer::Instance& data, bool streaming_filter) PURE;

/**
* Decode data directly to subsequent filters in the filter chain. This method is used in
* advanced cases in which a filter needs full control over how subsequent filters view data,
* and does not want to make use of HTTP connection manager buffering. Using this method allows
* a filter to buffer data (or not) and then periodically inject data to subsequent filters,
* indicating end_stream at an appropriate time. This can be used to implement rate limiting,
* periodic data emission, etc.
*
* This method should only be called outside of callback context. I.e., do not call this method
* from within a filter's decodeData() call.
*
* When using this callback, filters should generally only return
* FilterDataStatus::StopIterationNoBuffer from their decodeData() call, since use of this method
* indicates that a filter does not wish to participate in standard HTTP connection manager
* buffering and continuation and will perform any necessary buffering and continuation on its
* own.
*
* This callback is different from addDecodedData() in that the specified data and end_stream
* status will be propagated directly to further filters in the filter chain. This is different
* from addDecodedData() where data is added to the HTTP connection manager's buffered data with
* the assumption that standard HTTP connection manager buffering and continuation are being used.
*/
virtual void injectDecodedDataToFilterChain(Buffer::Instance& data, bool end_stream) PURE;

/**
* Adds decoded trailers. May only be called in decodeData when end_stream is set to true.
* If called in any other context, an assertion will be triggered.
Expand Down Expand Up @@ -469,11 +496,38 @@ class StreamEncoderFilterCallbacks : public virtual StreamFilterCallbacks {
*
* It is an error to call this method in any other case.
*
* See also injectEncodedDataToFilterChain() for a different way of passing data to further
* filters and also how the two methods are different.
*
* @param data Buffer::Instance supplies the data to be encoded.
* @param streaming_filter boolean supplies if this filter streams data or buffers the full body.
*/
virtual void addEncodedData(Buffer::Instance& data, bool streaming_filter) PURE;

/**
* Encode data directly to subsequent filters in the filter chain. This method is used in
* advanced cases in which a filter needs full control over how subsequent filters view data,
* and does not want to make use of HTTP connection manager buffering. Using this method allows
* a filter to buffer data (or not) and then periodically inject data to subsequent filters,
* indicating end_stream at an appropriate time. This can be used to implement rate limiting,
* periodic data emission, etc.
*
* This method should only be called outside of callback context. I.e., do not call this method
* from within a filter's encodeData() call.
*
* When using this callback, filters should generally only return
* FilterDataStatus::StopIterationNoBuffer from their encodeData() call, since use of this method
* indicates that a filter does not wish to participate in standard HTTP connection manager
* buffering and continuation and will perform any necessary buffering and continuation on its
* own.
*
* This callback is different from addEncodedData() in that the specified data and end_stream
* status will be propagated directly to further filters in the filter chain. This is different
* from addEncodedData() where data is added to the HTTP connection manager's buffered data with
* the assumption that standard HTTP connection manager buffering and continuation are being used.
*/
virtual void injectEncodedDataToFilterChain(Buffer::Instance& data, bool end_stream) PURE;

/**
* Adds encoded trailers. May only be called in encodeData when end_stream is set to true.
* If called in any other context, an assertion will be triggered.
Expand Down Expand Up @@ -517,7 +571,7 @@ class StreamEncoderFilterCallbacks : public virtual StreamFilterCallbacks {
*/
class StreamEncoderFilter : public StreamFilterBase {
public:
/*
/**
* Called with 100-continue headers.
*
* This is not folded into encodeHeaders because most Envoy users and filters
Expand Down
3 changes: 3 additions & 0 deletions source/common/http/async_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,9 @@ class AsyncStreamImpl : public AsyncClient::Stream,
// filter which uses this function for buffering.
ASSERT(buffered_body_ != nullptr);
}
void injectDecodedDataToFilterChain(Buffer::Instance&, bool) override {
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
}
const Buffer::Instance* decodingBuffer() override { return buffered_body_.get(); }
void modifyDecodingBuffer(std::function<void(Buffer::Instance&)>) override {
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
Expand Down
10 changes: 10 additions & 0 deletions source/common/http/conn_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1734,6 +1734,11 @@ void ConnectionManagerImpl::ActiveStreamDecoderFilter::addDecodedData(Buffer::In
parent_.addDecodedData(*this, data, streaming);
}

void ConnectionManagerImpl::ActiveStreamDecoderFilter::injectDecodedDataToFilterChain(
Buffer::Instance& data, bool end_stream) {
parent_.decodeData(this, data, end_stream);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this require some extra work on the StopAllIteration PR? If so, anything we can do here to reduce merge complexity?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this should cause any issues/conflicts, other than maybe wanting some more asserts on when this shouldn't be called. @soya3129 WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree some conditions/checks may be needed for StopAllIteration case. For example, checking if the current filter returns StopAllIteration for headers. Maybe add a more detailed comment about when injectDecodedDataToFilterChain() should be called? For example, injectDecodedDataToFilterChain() can be called only when iteration for data on this filter is not stopped(?).

What will happen if one subsequent filter stops all iteration for headers? Do we need to buffer the injected data somewhere in case buffer is required?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, injectDecodedDataToFilterChain() can be called only when iteration for data on this filter is not stopped(?).

It should still work if headers was stopped and even if the data was previously buffered. Do you still think the comments need updating with the new version?

What will happen if one subsequent filter stops all iteration for headers? Do we need to buffer the injected data somewhere in case buffer is required?

This should still work. If the next filter in the filter chain decides to buffer data it should work fine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Got it. Thanks!

}

void ConnectionManagerImpl::ActiveStreamDecoderFilter::continueDecoding() { commonContinue(); }

void ConnectionManagerImpl::ActiveStreamDecoderFilter::encode100ContinueHeaders(
Expand Down Expand Up @@ -1850,6 +1855,11 @@ void ConnectionManagerImpl::ActiveStreamEncoderFilter::addEncodedData(Buffer::In
return parent_.addEncodedData(*this, data, streaming);
}

void ConnectionManagerImpl::ActiveStreamEncoderFilter::injectEncodedDataToFilterChain(
Buffer::Instance& data, bool end_stream) {
parent_.encodeData(this, data, end_stream);
}

HeaderMap& ConnectionManagerImpl::ActiveStreamEncoderFilter::addEncodedTrailers() {
return parent_.addEncodedTrailers();
}
Expand Down
2 changes: 2 additions & 0 deletions source/common/http/conn_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,

// Http::StreamDecoderFilterCallbacks
void addDecodedData(Buffer::Instance& data, bool streaming) override;
void injectDecodedDataToFilterChain(Buffer::Instance& data, bool end_stream) override;
HeaderMap& addDecodedTrailers() override;
void continueDecoding() override;
const Buffer::Instance* decodingBuffer() override {
Expand Down Expand Up @@ -253,6 +254,7 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,

// Http::StreamEncoderFilterCallbacks
void addEncodedData(Buffer::Instance& data, bool streaming) override;
void injectEncodedDataToFilterChain(Buffer::Instance& data, bool end_stream) override;
HeaderMap& addEncodedTrailers() override;
void onEncoderFilterAboveWriteBufferHighWatermark() override;
void onEncoderFilterBelowWriteBufferLowWatermark() override;
Expand Down
174 changes: 174 additions & 0 deletions test/common/http/conn_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3422,6 +3422,180 @@ TEST_F(HttpConnectionManagerImplTest, AddDataWithStopAndContinue) {
encoder_filters_[2]->callbacks_->continueEncoding();
}

// Use filter direct decode/encodeData() calls without trailers.
TEST_F(HttpConnectionManagerImplTest, FilterDirectDecodeEncodeDataNoTrailers) {
InSequence s;
setup(false, "");

EXPECT_CALL(*codec_, dispatch(_)).WillOnce(Invoke([&](Buffer::Instance&) -> void {
StreamDecoder* decoder = &conn_manager_->newStream(response_encoder_);
HeaderMapPtr headers{
new TestHeaderMapImpl{{":authority", "host"}, {":path", "/"}, {":method", "GET"}}};
decoder->decodeHeaders(std::move(headers), false);

Buffer::OwnedImpl fake_data("hello");
decoder->decodeData(fake_data, true);
}));

EXPECT_CALL(*route_config_provider_.route_config_, route(_, _));
setupFilterChain(2, 2);

EXPECT_CALL(*decoder_filters_[0], decodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::Continue));
EXPECT_CALL(*decoder_filters_[1], decodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::StopIteration));

Buffer::OwnedImpl decode_buffer;
EXPECT_CALL(*decoder_filters_[0], decodeData(_, true))
.WillOnce(Invoke([&](Buffer::Instance& data, bool) {
decode_buffer.move(data);
return FilterDataStatus::StopIterationNoBuffer;
}));
EXPECT_CALL(*decoder_filters_[0], decodeComplete());

// Kick off the incoming data.
Buffer::OwnedImpl fake_input("1234");
conn_manager_->onData(fake_input, false);

Buffer::OwnedImpl decoded_data_to_forward;
decoded_data_to_forward.move(decode_buffer, 2);
EXPECT_CALL(*decoder_filters_[1], decodeData(BufferStringEqual("he"), false))
.WillOnce(Return(FilterDataStatus::StopIterationNoBuffer));
decoder_filters_[0]->callbacks_->injectDecodedDataToFilterChain(decoded_data_to_forward, false);

EXPECT_CALL(*decoder_filters_[1], decodeData(BufferStringEqual("llo"), true))
.WillOnce(Return(FilterDataStatus::StopIterationNoBuffer));
EXPECT_CALL(*decoder_filters_[1], decodeComplete());
decoder_filters_[0]->callbacks_->injectDecodedDataToFilterChain(decode_buffer, true);

// Response path.
EXPECT_CALL(*encoder_filters_[1], encodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::Continue));
EXPECT_CALL(*encoder_filters_[0], encodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::Continue));
EXPECT_CALL(response_encoder_, encodeHeaders(_, false));

Buffer::OwnedImpl encoder_buffer;
EXPECT_CALL(*encoder_filters_[1], encodeData(_, true))
.WillOnce(Invoke([&](Buffer::Instance& data, bool) {
encoder_buffer.move(data);
return FilterDataStatus::StopIterationNoBuffer;
}));
EXPECT_CALL(*encoder_filters_[1], encodeComplete());

decoder_filters_[1]->callbacks_->encodeHeaders(
HeaderMapPtr{new TestHeaderMapImpl{{":status", "200"}}}, false);
Buffer::OwnedImpl response_body("response");
decoder_filters_[1]->callbacks_->encodeData(response_body, true);

Buffer::OwnedImpl encoded_data_to_forward;
encoded_data_to_forward.move(encoder_buffer, 3);
EXPECT_CALL(*encoder_filters_[0], encodeData(BufferStringEqual("res"), false));
EXPECT_CALL(response_encoder_, encodeData(_, false));
encoder_filters_[1]->callbacks_->injectEncodedDataToFilterChain(encoded_data_to_forward, false);

EXPECT_CALL(*encoder_filters_[0], encodeData(BufferStringEqual("ponse"), true));
EXPECT_CALL(*encoder_filters_[0], encodeComplete());
EXPECT_CALL(response_encoder_, encodeData(_, true));
expectOnDestroy();
encoder_filters_[1]->callbacks_->injectEncodedDataToFilterChain(encoder_buffer, true);
}

// Use filter direct decode/encodeData() calls with trailers.
TEST_F(HttpConnectionManagerImplTest, FilterDirectDecodeEncodeDataTrailers) {
InSequence s;
setup(false, "");

EXPECT_CALL(*codec_, dispatch(_)).WillOnce(Invoke([&](Buffer::Instance&) -> void {
StreamDecoder* decoder = &conn_manager_->newStream(response_encoder_);
HeaderMapPtr headers{
new TestHeaderMapImpl{{":authority", "host"}, {":path", "/"}, {":method", "GET"}}};
decoder->decodeHeaders(std::move(headers), false);

Buffer::OwnedImpl fake_data("hello");
decoder->decodeData(fake_data, false);

HeaderMapPtr trailers{new TestHeaderMapImpl{{"foo", "bar"}}};
decoder->decodeTrailers(std::move(trailers));
}));

EXPECT_CALL(*route_config_provider_.route_config_, route(_, _));
setupFilterChain(2, 2);

EXPECT_CALL(*decoder_filters_[0], decodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::Continue));
EXPECT_CALL(*decoder_filters_[1], decodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::StopIteration));

Buffer::OwnedImpl decode_buffer;
EXPECT_CALL(*decoder_filters_[0], decodeData(_, false))
.WillOnce(Invoke([&](Buffer::Instance& data, bool) {
decode_buffer.move(data);
return FilterDataStatus::StopIterationNoBuffer;
}));
EXPECT_CALL(*decoder_filters_[0], decodeTrailers(_))
.WillOnce(Return(FilterTrailersStatus::StopIteration));
EXPECT_CALL(*decoder_filters_[0], decodeComplete());

// Kick off the incoming data.
Buffer::OwnedImpl fake_input("1234");
conn_manager_->onData(fake_input, false);

Buffer::OwnedImpl decoded_data_to_forward;
decoded_data_to_forward.move(decode_buffer, 2);
EXPECT_CALL(*decoder_filters_[1], decodeData(BufferStringEqual("he"), false))
.WillOnce(Return(FilterDataStatus::StopIterationNoBuffer));
decoder_filters_[0]->callbacks_->injectDecodedDataToFilterChain(decoded_data_to_forward, false);

EXPECT_CALL(*decoder_filters_[1], decodeData(BufferStringEqual("llo"), false))
.WillOnce(Return(FilterDataStatus::StopIterationNoBuffer));
decoder_filters_[0]->callbacks_->injectDecodedDataToFilterChain(decode_buffer, false);

EXPECT_CALL(*decoder_filters_[1], decodeTrailers(_));
EXPECT_CALL(*decoder_filters_[1], decodeComplete());
decoder_filters_[0]->callbacks_->continueDecoding();

// Response path.
EXPECT_CALL(*encoder_filters_[1], encodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::Continue));
EXPECT_CALL(*encoder_filters_[0], encodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::Continue));
EXPECT_CALL(response_encoder_, encodeHeaders(_, false));

Buffer::OwnedImpl encoder_buffer;
EXPECT_CALL(*encoder_filters_[1], encodeData(_, false))
.WillOnce(Invoke([&](Buffer::Instance& data, bool) {
encoder_buffer.move(data);
return FilterDataStatus::StopIterationNoBuffer;
}));
EXPECT_CALL(*encoder_filters_[1], encodeTrailers(_))
.WillOnce(Return(FilterTrailersStatus::StopIteration));
EXPECT_CALL(*encoder_filters_[1], encodeComplete());

decoder_filters_[1]->callbacks_->encodeHeaders(
HeaderMapPtr{new TestHeaderMapImpl{{":status", "200"}}}, false);
Buffer::OwnedImpl response_body("response");
decoder_filters_[1]->callbacks_->encodeData(response_body, false);
decoder_filters_[1]->callbacks_->encodeTrailers(
HeaderMapPtr{new TestHeaderMapImpl{{":status", "200"}}});

Buffer::OwnedImpl encoded_data_to_forward;
encoded_data_to_forward.move(encoder_buffer, 3);
EXPECT_CALL(*encoder_filters_[0], encodeData(BufferStringEqual("res"), false));
EXPECT_CALL(response_encoder_, encodeData(_, false));
encoder_filters_[1]->callbacks_->injectEncodedDataToFilterChain(encoded_data_to_forward, false);

EXPECT_CALL(*encoder_filters_[0], encodeData(BufferStringEqual("ponse"), false));
EXPECT_CALL(response_encoder_, encodeData(_, false));
encoder_filters_[1]->callbacks_->injectEncodedDataToFilterChain(encoder_buffer, false);

EXPECT_CALL(*encoder_filters_[0], encodeTrailers(_));
EXPECT_CALL(*encoder_filters_[0], encodeComplete());
EXPECT_CALL(response_encoder_, encodeTrailers(_));
expectOnDestroy();
encoder_filters_[1]->callbacks_->continueEncoding();
}

TEST_F(HttpConnectionManagerImplTest, MultipleFilters) {
InSequence s;
setup(false, "");
Expand Down
2 changes: 2 additions & 0 deletions test/mocks/http/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ class MockStreamDecoderFilterCallbacks : public StreamDecoderFilterCallbacks,

MOCK_METHOD0(continueDecoding, void());
MOCK_METHOD2(addDecodedData, void(Buffer::Instance& data, bool streaming));
MOCK_METHOD2(injectDecodedDataToFilterChain, void(Buffer::Instance& data, bool end_stream));
MOCK_METHOD0(addDecodedTrailers, HeaderMap&());
MOCK_METHOD0(decodingBuffer, const Buffer::Instance*());
MOCK_METHOD1(modifyDecodingBuffer, void(std::function<void(Buffer::Instance&)>));
Expand Down Expand Up @@ -219,6 +220,7 @@ class MockStreamEncoderFilterCallbacks : public StreamEncoderFilterCallbacks,

// Http::StreamEncoderFilterCallbacks
MOCK_METHOD2(addEncodedData, void(Buffer::Instance& data, bool streaming));
MOCK_METHOD2(injectEncodedDataToFilterChain, void(Buffer::Instance& data, bool end_stream));
MOCK_METHOD0(addEncodedTrailers, HeaderMap&());
MOCK_METHOD0(continueEncoding, void());
MOCK_METHOD0(encodingBuffer, const Buffer::Instance*());
Expand Down