Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions include/envoy/http/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,25 @@ enum class FilterHeadersStatus {
// results in creating a header only request/response.
// This status MUST NOT be returned by decodeHeaders() when end_stream is set to true.
ContinueAndEndStream,
// Continue iteration to remaining filters, but do not end the stream.
Comment thread
yosrym93 marked this conversation as resolved.
Outdated
//
// Used when a filter wants to add a body to headers-only request/response, but this body is not
// readily available.
// This causes the headers iteration to continue, but the stream does not end.
// The filter is responsible to continue the stream by providing a body through
Comment thread
yosrym93 marked this conversation as resolved.
Outdated
// injectEncodedDataToFilterChain()/injectDecodedDataToFilterChain().
// If the filter cannot provide a body the stream should be reset.
//
// Adding a body through calling addDecodedData()/addEncodedData() then
// continueDecoding()/continueEncoding() is currently not supported and causes an assert failure.
//
// TODO(yosrym93): Support adding a body in this case by calling addDecodedData()/addEncodedData()
// then continueDecoding()/continueEncoding(). To support this a new FilterManager::IterationState
// needs to be added and set when a filter returns this status in FilterManager::encodeHeaders.
// Currently, when a filter returns this, the IterationState is Continue. This causes ASSERTs at
// FilterManager::commonContinue() to fail when continueDecoding()/continueEncoding() is called;
// due to trying to continue iteration when the IterationState is already Continue.
ContinueAndDontEndStream,
Comment thread
yosrym93 marked this conversation as resolved.
// Do not iterate for headers as well as data and trailers for the current filter and the filters
// following, and buffer body data for later dispatching. ContinueDecoding() MUST
// be called if continued filter iteration is desired.
Expand Down
54 changes: 43 additions & 11 deletions source/common/http/filter_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ void ActiveStreamFilterBase::commonContinue() {

ENVOY_STREAM_LOG(trace, "continuing filter chain: filter={}", *this,
static_cast<const void*>(this));
ASSERT(!canIterate());
ASSERT(!canIterate(),
"Attempting to continue iteration while the IterationState is already Continue");
// If iteration has stopped for all frame types, set iterate_from_current_filter_ to true so the
// filter iteration starts with the current filter instead of the next one.
if (stoppedAll()) {
Expand Down Expand Up @@ -108,24 +109,38 @@ bool ActiveStreamFilterBase::commonHandleAfter100ContinueHeadersCallback(
}

bool ActiveStreamFilterBase::commonHandleAfterHeadersCallback(FilterHeadersStatus status,
bool& end_stream,
bool& headers_only) {
ASSERT(!headers_continued_);
ASSERT(canIterate());

if (status == FilterHeadersStatus::StopIteration) {
switch (status) {
case FilterHeadersStatus::StopIteration:
iteration_state_ = IterationState::StopSingleIteration;
} else if (status == FilterHeadersStatus::StopAllIterationAndBuffer) {
break;
case FilterHeadersStatus::StopAllIterationAndBuffer:
iteration_state_ = IterationState::StopAllBuffer;
} else if (status == FilterHeadersStatus::StopAllIterationAndWatermark) {
break;
case FilterHeadersStatus::StopAllIterationAndWatermark:
iteration_state_ = IterationState::StopAllWatermark;
} else if (status == FilterHeadersStatus::ContinueAndEndStream) {
break;
case FilterHeadersStatus::ContinueAndEndStream:
// Set headers_only to true so we know to end early if necessary,
// but continue filter iteration so we actually write the headers/run the cleanup code.
headers_only = true;
ENVOY_STREAM_LOG(debug, "converting to headers only", parent_);
} else {
ASSERT(status == FilterHeadersStatus::Continue);
break;
case FilterHeadersStatus::ContinueAndDontEndStream:
headers_only = false;
end_stream = false;
headers_continued_ = true;
ENVOY_STREAM_LOG(debug, "converting to headers and body (body not available yet)", parent_);
break;
case FilterHeadersStatus::Continue:
headers_continued_ = true;
break;
default:
ASSERT(false, "Unrecognized FilterHeadersStatus");
Comment thread
yosrym93 marked this conversation as resolved.
Outdated
}

handleMetadataAfterHeadersCallback();
Expand Down Expand Up @@ -434,7 +449,13 @@ void FilterManager::decodeHeaders(ActiveStreamDecoderFilter* filter, RequestHead
(end_stream && continue_data_entry == decoder_filters_.end());
FilterHeadersStatus status = (*entry)->decodeHeaders(headers, (*entry)->end_stream_);

ASSERT(!(status == FilterHeadersStatus::ContinueAndEndStream && (*entry)->end_stream_));
ASSERT(!(status == FilterHeadersStatus::ContinueAndEndStream && (*entry)->end_stream_),
"Filters should not return FilterHeadersStatus::ContinueAndEndStream from decodeHeaders "
"when end_stream is already true");
ASSERT(!(status == FilterHeadersStatus::ContinueAndDontEndStream && !(*entry)->end_stream_),
"Filters should not return FilterHeadersStatus::ContinueAndDontEndStream from "
"decodeHeaders when end_stream is already false");

state_.filter_call_state_ &= ~FilterCallState::DecodeHeaders;
ENVOY_STREAM_LOG(trace, "decode headers called: filter={} status={}", *this,
static_cast<const void*>((*entry).get()), static_cast<uint64_t>(status));
Expand All @@ -453,7 +474,8 @@ void FilterManager::decodeHeaders(ActiveStreamDecoderFilter* filter, RequestHead
}

(*entry)->decode_headers_called_ = true;
if (!(*entry)->commonHandleAfterHeadersCallback(status, state_.decoding_headers_only_) &&
if (!(*entry)->commonHandleAfterHeadersCallback(status, end_stream,
state_.decoding_headers_only_) &&
std::next(entry) != decoder_filters_.end()) {
// Stop iteration IFF this is not the last filter. If it is the last filter, continue with
// processing since we need to handle the case where a terminal filter wants to buffer, but
Expand Down Expand Up @@ -902,6 +924,14 @@ void FilterManager::encodeHeaders(ActiveStreamEncoderFilter* filter, ResponseHea
(*entry)->end_stream_ = state_.encoding_headers_only_ ||
(end_stream && continue_data_entry == encoder_filters_.end());
FilterHeadersStatus status = (*entry)->handle_->encodeHeaders(headers, (*entry)->end_stream_);

ASSERT(!(status == FilterHeadersStatus::ContinueAndEndStream && (*entry)->end_stream_),
"Filters should not return FilterHeadersStatus::ContinueAndEndStream from encodeHeaders "
"when end_stream is already true");
ASSERT(!(status == FilterHeadersStatus::ContinueAndDontEndStream && !(*entry)->end_stream_),
"Filters should not return FilterHeadersStatus::ContinueAndDontEndStream from "
"encodeHeaders when end_stream is already false");

if ((*entry)->end_stream_) {
(*entry)->handle_->encodeComplete();
}
Expand All @@ -910,8 +940,8 @@ void FilterManager::encodeHeaders(ActiveStreamEncoderFilter* filter, ResponseHea
static_cast<const void*>((*entry).get()), static_cast<uint64_t>(status));

(*entry)->encode_headers_called_ = true;
const auto continue_iteration =
(*entry)->commonHandleAfterHeadersCallback(status, state_.encoding_headers_only_);
const auto continue_iteration = (*entry)->commonHandleAfterHeadersCallback(
status, end_stream, state_.encoding_headers_only_);

// If we're encoding a headers only response, then mark the local as complete. This ensures
// that we don't attempt to reset the downstream request in doEndStream.
Expand Down Expand Up @@ -1331,6 +1361,8 @@ void ActiveStreamEncoderFilter::addEncodedData(Buffer::Instance& data, bool stre

void ActiveStreamEncoderFilter::injectEncodedDataToFilterChain(Buffer::Instance& data,
bool end_stream) {
// TODO(yosrym93): Check if this filter had previously stopped headers iteration.
// If so, it should be continued before injecting data.
parent_.encodeData(this, data, end_stream,
FilterManager::FilterIterationStartState::CanStartFromCurrent);
}
Expand Down
3 changes: 2 additions & 1 deletion source/common/http/filter_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ struct ActiveStreamFilterBase : public virtual StreamFilterCallbacks,
// corresponding data. Those functions handle state updates and data storage (if needed)
// according to the status returned by filter's callback functions.
bool commonHandleAfter100ContinueHeadersCallback(FilterHeadersStatus status);
bool commonHandleAfterHeadersCallback(FilterHeadersStatus status, bool& headers_only);
bool commonHandleAfterHeadersCallback(FilterHeadersStatus status, bool& end_stream,
bool& headers_only);
bool commonHandleAfterDataCallback(FilterDataStatus status, Buffer::Instance& provided_data,
bool& buffer_was_streaming);
bool commonHandleAfterTrailersCallback(FilterTrailersStatus status);
Expand Down
62 changes: 59 additions & 3 deletions test/common/http/conn_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5070,9 +5070,8 @@ TEST_F(HttpConnectionManagerImplTest, FilterContinueAndEndStreamHeaders) {
Buffer::OwnedImpl fake_input("1234");
conn_manager_->onData(fake_input, true);

EXPECT_CALL(*encoder_filters_[1], encodeHeaders(_, true))
EXPECT_CALL(*encoder_filters_[1], encodeHeaders(_, false))
Comment thread
yosrym93 marked this conversation as resolved.
.WillOnce(Return(FilterHeadersStatus::ContinueAndEndStream));
EXPECT_CALL(*encoder_filters_[1], encodeComplete());
Comment thread
yosrym93 marked this conversation as resolved.
EXPECT_CALL(*encoder_filters_[0], encodeHeaders(_, true))
.WillOnce(Return(FilterHeadersStatus::Continue));
EXPECT_CALL(*encoder_filters_[0], encodeComplete());
Expand All @@ -5082,7 +5081,7 @@ TEST_F(HttpConnectionManagerImplTest, FilterContinueAndEndStreamHeaders) {

decoder_filters_[1]->callbacks_->streamInfo().setResponseCodeDetails("");
decoder_filters_[1]->callbacks_->encodeHeaders(
makeHeaderMap<TestResponseHeaderMapImpl>({{":status", "200"}}), true);
makeHeaderMap<TestResponseHeaderMapImpl>({{":status", "200"}}), false);

Buffer::OwnedImpl response_body("response");
decoder_filters_[1]->callbacks_->encodeData(response_body, true);
Expand Down Expand Up @@ -5182,6 +5181,63 @@ TEST_F(HttpConnectionManagerImplTest, FilterContinueAndEndStreamTrailers) {
decoder_filters_[1]->callbacks_->encodeTrailers(std::move(response_trailers));
}

// filter continues headers iteration without ending the stream, then injects a body later.
Comment thread
yosrym93 marked this conversation as resolved.
Outdated
TEST_F(HttpConnectionManagerImplTest, FilterContinueDontEndStreamInjectBody) {
InSequence s;
setup(false, "");

EXPECT_CALL(*codec_, dispatch(_)).WillOnce(Invoke([&](Buffer::Instance&) -> Http::Status {
RequestDecoder* decoder = &conn_manager_->newStream(response_encoder_);
auto headers = makeHeaderMap<TestRequestHeaderMapImpl>(
{{":authority", "host"}, {":path", "/"}, {":method", "GET"}});
decoder->decodeHeaders(std::move(headers), true);
return Http::okStatus();
}));

setupFilterChain(2, 2);

// Decode filter 0 changes end_stream to false.
EXPECT_CALL(*decoder_filters_[0], decodeHeaders(_, true))
.WillOnce(Return(FilterHeadersStatus::ContinueAndDontEndStream));
EXPECT_CALL(*decoder_filters_[0], decodeComplete());
EXPECT_CALL(*decoder_filters_[1], decodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::Continue));

// Kick off the incoming data.
Buffer::OwnedImpl fake_input("1234");
conn_manager_->onData(fake_input, true);

EXPECT_CALL(*decoder_filters_[1], decodeData(_, true))
.WillOnce(Return(FilterDataStatus::Continue));
EXPECT_CALL(*decoder_filters_[1], decodeComplete());

// Decode filter 0 injects request body later.
Buffer::OwnedImpl data("hello");
decoder_filters_[0]->callbacks_->injectDecodedDataToFilterChain(data, true);

// Encode filter 1 changes end_stream to false.
EXPECT_CALL(*encoder_filters_[1], encodeHeaders(_, true))
.WillOnce(Return(FilterHeadersStatus::ContinueAndDontEndStream));
EXPECT_CALL(*encoder_filters_[1], encodeComplete());
EXPECT_CALL(*encoder_filters_[0], encodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::Continue));
EXPECT_CALL(response_encoder_, encodeHeaders(_, false));

decoder_filters_[1]->callbacks_->streamInfo().setResponseCodeDetails("");
decoder_filters_[1]->callbacks_->encodeHeaders(
makeHeaderMap<TestResponseHeaderMapImpl>({{":status", "200"}}), true);

EXPECT_CALL(*encoder_filters_[0], encodeData(_, true))
.WillOnce(Return(FilterDataStatus::Continue));
EXPECT_CALL(*encoder_filters_[0], encodeComplete());
EXPECT_CALL(response_encoder_, encodeData(_, true));
expectOnDestroy();

// Encode filter 1 injects request body later.
Buffer::OwnedImpl data2("hello");
encoder_filters_[1]->callbacks_->injectEncodedDataToFilterChain(data2, true);
}

TEST_F(HttpConnectionManagerImplTest, FilterAddBodyContinuation) {
InSequence s;
setup(false, "");
Expand Down
1 change: 1 addition & 0 deletions test/integration/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ envoy_cc_test(
"//source/extensions/filters/http/buffer:config",
"//source/extensions/filters/http/health_check:config",
"//test/common/http/http2:http2_frame",
"//test/integration/filters:continue_headers_only_inject_body",
"//test/integration/filters:metadata_stop_all_filter_config_lib",
"//test/integration/filters:request_metadata_filter_config_lib",
"//test/integration/filters:response_metadata_filter_config_lib",
Expand Down
15 changes: 15 additions & 0 deletions test/integration/filters/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,21 @@ envoy_cc_test_library(
],
)

envoy_cc_test_library(
name = "continue_headers_only_inject_body",
srcs = [
"continue_headers_only_inject_body_filter.cc",
],
deps = [
":common_lib",
"//include/envoy/http:filter_interface",
"//include/envoy/registry",
"//include/envoy/server:filter_config_interface",
"//source/extensions/filters/http/common:pass_through_filter_lib",
"//test/extensions/filters/http/common:empty_http_filter_config_lib",
],
)

envoy_cc_test_library(
name = "wait_for_whole_request_and_response_config_lib",
srcs = [
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#include <string>

#include "envoy/http/filter.h"
#include "envoy/registry/registry.h"
#include "envoy/server/filter_config.h"

#include "common/buffer/buffer_impl.h"

#include "extensions/filters/http/common/pass_through_filter.h"

#include "test/extensions/filters/http/common/empty_http_filter_config.h"
#include "test/integration/filters/common.h"

namespace Envoy {

// A test filter that continues iteration of headers-only request/response without ending the
// stream, then injects a body later.
class ContinueHeadersOnlyInjectBodyFilter : public Http::PassThroughFilter {
public:
constexpr static char name[] = "continue-headers-only-inject-body-filter";

Http::FilterHeadersStatus decodeHeaders(Http::RequestHeaderMap& headers, bool) override {
headers.setContentLength(body_.length());
decoder_callbacks_->dispatcher().post([this]() -> void {
Buffer::OwnedImpl buffer(body_);
decoder_callbacks_->injectDecodedDataToFilterChain(buffer, true);
});
return Http::FilterHeadersStatus::ContinueAndDontEndStream;
}

Http::FilterHeadersStatus encodeHeaders(Http::ResponseHeaderMap& headers, bool) override {
headers.setContentLength(body_.length());
encoder_callbacks_->dispatcher().post([this]() -> void {
Buffer::OwnedImpl buffer(body_);
encoder_callbacks_->injectEncodedDataToFilterChain(buffer, true);
});
return Http::FilterHeadersStatus::ContinueAndDontEndStream;
}

private:
std::string body_ = "body";
Comment thread
yosrym93 marked this conversation as resolved.
Outdated
};

static Registry::RegisterFactory<SimpleFilterConfig<ContinueHeadersOnlyInjectBodyFilter>,
Server::Configuration::NamedHttpFilterConfigFactory>
register_;
} // namespace Envoy
28 changes: 28 additions & 0 deletions test/integration/http2_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1245,6 +1245,34 @@ TEST_P(Http2IntegrationTest, PauseAndResumeHeadersOnly) {
ASSERT_TRUE(response->complete());
}

TEST_P(Http2IntegrationTest, ContinueHeadersOnlyInjectBodyFilter) {
Comment thread
yosrym93 marked this conversation as resolved.
Outdated
config_helper_.addFilter(R"EOF(
name: continue-headers-only-inject-body-filter
typed_config:
"@type": type.googleapis.com/google.protobuf.Empty
)EOF");
initialize();

codec_client_ = makeHttpConnection(lookupPort("http"));

// Send a headers only request.
auto response = codec_client_->makeHeaderOnlyRequest(default_request_headers_);
waitForNextUpstreamRequest();

// Make sure that the body was injected to the request.
EXPECT_TRUE(upstream_request_->complete());
EXPECT_TRUE(upstream_request_->receivedData());
EXPECT_EQ(upstream_request_->body().toString(), "body");

// Send a headers only response.
upstream_request_->encodeHeaders(default_response_headers_, true);
response->waitForEndStream();

// Make sure that the body was injected to the response.
EXPECT_TRUE(response->complete());
EXPECT_EQ(response->body(), "body");
}

// Verify the case when we have large pending data with empty trailers. It should not introduce
// stack-overflow (on ASan build). This is a regression test for
// https://bugs.chromium.org/p/oss-fuzz/issues/detail?id=24714.
Expand Down