Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 39 additions & 2 deletions include/envoy/http/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,47 @@ enum class FilterHeadersStatus {
// FilterDataStatus::Continue from decodeData()/encodeData() or calling
// continueDecoding()/continueEncoding() MUST be called if continued filter iteration is desired.
StopIteration,
// Continue iteration to remaining filters, but ignore any subsequent data or trailers. This
// results in creating a header only request/response.
// Continue headers iteration to remaining filters, but ignore any subsequent data or trailers.
// This results in creating a header only request/response.
// This status MUST NOT be returned by decodeHeaders() when end_stream is set to true.
ContinueAndEndStream,
// Continue headers iteration to remaining filters, but delay ending the stream. This status MUST
// NOT be returned when end_stream is already set to false.
//
// Used when a filter wants to add a body to a headers-only request/response, but this body is not
// readily available. Delaying end_stream allows the filter to add the body once it's available
// without stopping headers iteration.
//
// The filter is responsible to continue the stream by providing a body through calling
// injectDecodedDataToFilterChain()/injectEncodedDataToFilterChain(), possibly multiple times
// if the body needs to be divided into several chunks. The filter may need to handle
// watermark events when injecting a body, see:
// https://github.com/envoyproxy/envoy/blob/master/source/docs/flow_control.md.
//
// The last call to inject data MUST have end_stream set to true to conclude the stream.
// If the filter cannot provide a body the stream should be reset.
//
// Adding a body through calling addDecodedData()/addEncodedData() then
// continueDecoding()/continueEncoding() is currently NOT supported and causes an assert failure.
//
// Adding trailers in this scenario is currently NOT supported.
//
// The filter MUST NOT attempt to continue the stream without providing a body using
// continueDecoding()/continueEncoding().
//
// TODO(yosrym93): Support adding a body in this case by calling addDecodedData()/addEncodedData()
// then continueDecoding()/continueEncoding(). To support this a new FilterManager::IterationState
// needs to be added and set when a filter returns this status in
// FilterManager::decodeHeaders/FilterManager::encodeHeaders()
// Currently, when a filter returns this, the IterationState is Continue. This causes ASSERTs in
// FilterManager::commonContinue() to fail when continueDecoding()/continueEncoding() is called;
// due to trying to continue iteration when the IterationState is already Continue.
// In this case, a different ASSERT will be needed to make sure the filter does not try to
// continue without adding a body first.
//
// TODO(yosrym93): Support adding trailers in this case by implementing new functions to inject
// trailers, similar to the inject data functions.
ContinueAndDontEndStream,
// Do not iterate for headers as well as data and trailers for the current filter and the filters
// following, and buffer body data for later dispatching. ContinueDecoding() MUST
// be called if continued filter iteration is desired.
Expand Down
75 changes: 58 additions & 17 deletions source/common/http/filter_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ void ActiveStreamFilterBase::commonContinue() {

ENVOY_STREAM_LOG(trace, "continuing filter chain: filter={}", *this,
static_cast<const void*>(this));
ASSERT(!canIterate());
ASSERT(!canIterate(),
"Attempting to continue iteration while the IterationState is already Continue");
// If iteration has stopped for all frame types, set iterate_from_current_filter_ to true so the
// filter iteration starts with the current filter instead of the next one.
if (stoppedAll()) {
Expand Down Expand Up @@ -108,24 +109,36 @@ bool ActiveStreamFilterBase::commonHandleAfter100ContinueHeadersCallback(
}

bool ActiveStreamFilterBase::commonHandleAfterHeadersCallback(FilterHeadersStatus status,
bool& end_stream,
bool& headers_only) {
ASSERT(!headers_continued_);
ASSERT(canIterate());

if (status == FilterHeadersStatus::StopIteration) {
switch (status) {
case FilterHeadersStatus::StopIteration:
iteration_state_ = IterationState::StopSingleIteration;
} else if (status == FilterHeadersStatus::StopAllIterationAndBuffer) {
break;
case FilterHeadersStatus::StopAllIterationAndBuffer:
iteration_state_ = IterationState::StopAllBuffer;
} else if (status == FilterHeadersStatus::StopAllIterationAndWatermark) {
break;
case FilterHeadersStatus::StopAllIterationAndWatermark:
iteration_state_ = IterationState::StopAllWatermark;
} else if (status == FilterHeadersStatus::ContinueAndEndStream) {
break;
case FilterHeadersStatus::ContinueAndEndStream:
// Set headers_only to true so we know to end early if necessary,
// but continue filter iteration so we actually write the headers/run the cleanup code.
headers_only = true;
ENVOY_STREAM_LOG(debug, "converting to headers only", parent_);
} else {
ASSERT(status == FilterHeadersStatus::Continue);
break;
case FilterHeadersStatus::ContinueAndDontEndStream:
headers_only = false;
end_stream = false;
headers_continued_ = true;
ENVOY_STREAM_LOG(debug, "converting to headers and body (body not available yet)", parent_);
break;
case FilterHeadersStatus::Continue:
headers_continued_ = true;
break;
}

handleMetadataAfterHeadersCallback();
Expand Down Expand Up @@ -436,11 +449,28 @@ void FilterManager::decodeHeaders(ActiveStreamDecoderFilter* filter, RequestHead
(end_stream && continue_data_entry == decoder_filters_.end());
FilterHeadersStatus status = (*entry)->decodeHeaders(headers, (*entry)->end_stream_);

ASSERT(!(status == FilterHeadersStatus::ContinueAndEndStream && (*entry)->end_stream_));
ASSERT(!(status == FilterHeadersStatus::ContinueAndEndStream && (*entry)->end_stream_),
"Filters should not return FilterHeadersStatus::ContinueAndEndStream from decodeHeaders "
"when end_stream is already true");
ASSERT(!(status == FilterHeadersStatus::ContinueAndDontEndStream && !(*entry)->end_stream_),
"Filters should not return FilterHeadersStatus::ContinueAndDontEndStream from "
"decodeHeaders when end_stream is already false");

state_.filter_call_state_ &= ~FilterCallState::DecodeHeaders;
ENVOY_STREAM_LOG(trace, "decode headers called: filter={} status={}", *this,
static_cast<const void*>((*entry).get()), static_cast<uint64_t>(status));

(*entry)->decode_headers_called_ = true;

// decoding_headers_only_ is set if the filter returns ContinueAndEndStream.
const auto continue_iteration = (*entry)->commonHandleAfterHeadersCallback(
status, end_stream, state_.decoding_headers_only_);

// If this filter ended the stream, decodeComplete() should be called for it.
if ((*entry)->end_stream_ || state_.decoding_headers_only_) {
(*entry)->handle_->decodeComplete();
}

const bool new_metadata_added = processNewlyAddedMetadata();
// If end_stream is set in headers, and a filter adds new metadata, we need to delay end_stream
// in headers by inserting an empty data frame with end_stream set. The empty data frame is sent
Expand All @@ -454,9 +484,7 @@ void FilterManager::decodeHeaders(ActiveStreamDecoderFilter* filter, RequestHead
addDecodedData(*((*entry).get()), empty_data, true);
}

(*entry)->decode_headers_called_ = true;
if (!(*entry)->commonHandleAfterHeadersCallback(status, state_.decoding_headers_only_) &&
std::next(entry) != decoder_filters_.end()) {
if (!continue_iteration && std::next(entry) != decoder_filters_.end()) {
// Stop iteration IFF this is not the last filter. If it is the last filter, continue with
// processing since we need to handle the case where a terminal filter wants to buffer, but
// a previous filter has added body.
Expand Down Expand Up @@ -642,7 +670,6 @@ void FilterManager::decodeTrailers(ActiveStreamDecoderFilter* filter, RequestTra
if ((*entry)->stoppedAll()) {
return;
}

ASSERT(!(state_.filter_call_state_ & FilterCallState::DecodeTrailers));
state_.filter_call_state_ |= FilterCallState::DecodeTrailers;
FilterTrailersStatus status = (*entry)->handle_->decodeTrailers(trailers);
Expand Down Expand Up @@ -905,16 +932,28 @@ void FilterManager::encodeHeaders(ActiveStreamEncoderFilter* filter, ResponseHea
(*entry)->end_stream_ = state_.encoding_headers_only_ ||
(end_stream && continue_data_entry == encoder_filters_.end());
FilterHeadersStatus status = (*entry)->handle_->encodeHeaders(headers, (*entry)->end_stream_);
if ((*entry)->end_stream_) {
(*entry)->handle_->encodeComplete();
}

ASSERT(!(status == FilterHeadersStatus::ContinueAndEndStream && (*entry)->end_stream_),
"Filters should not return FilterHeadersStatus::ContinueAndEndStream from encodeHeaders "
"when end_stream is already true");
ASSERT(!(status == FilterHeadersStatus::ContinueAndDontEndStream && !(*entry)->end_stream_),
"Filters should not return FilterHeadersStatus::ContinueAndDontEndStream from "
"encodeHeaders when end_stream is already false");

state_.filter_call_state_ &= ~FilterCallState::EncodeHeaders;
ENVOY_STREAM_LOG(trace, "encode headers called: filter={} status={}", *this,
static_cast<const void*>((*entry).get()), static_cast<uint64_t>(status));

(*entry)->encode_headers_called_ = true;
const auto continue_iteration =
(*entry)->commonHandleAfterHeadersCallback(status, state_.encoding_headers_only_);

// encoding_headers_only_ is set if the filter returns ContinueAndEndStream.
const auto continue_iteration = (*entry)->commonHandleAfterHeadersCallback(
status, end_stream, state_.encoding_headers_only_);

// If this filter ended the stream, encodeComplete() should be called for it.
if ((*entry)->end_stream_ || state_.encoding_headers_only_) {
(*entry)->handle_->encodeComplete();
}

// If we're encoding a headers only response, then mark the local as complete. This ensures
// that we don't attempt to reset the downstream request in doEndStream.
Expand Down Expand Up @@ -1335,6 +1374,8 @@ void ActiveStreamEncoderFilter::addEncodedData(Buffer::Instance& data, bool stre

void ActiveStreamEncoderFilter::injectEncodedDataToFilterChain(Buffer::Instance& data,
bool end_stream) {
// TODO(yosrym93): Check if this filter had previously stopped headers iteration.
// If so, it should be continued before injecting data.
parent_.encodeData(this, data, end_stream,
FilterManager::FilterIterationStartState::CanStartFromCurrent);
}
Expand Down
6 changes: 2 additions & 4 deletions source/common/http/filter_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ struct ActiveStreamFilterBase : public virtual StreamFilterCallbacks,
// corresponding data. Those functions handle state updates and data storage (if needed)
// according to the status returned by filter's callback functions.
bool commonHandleAfter100ContinueHeadersCallback(FilterHeadersStatus status);
bool commonHandleAfterHeadersCallback(FilterHeadersStatus status, bool& headers_only);
bool commonHandleAfterHeadersCallback(FilterHeadersStatus status, bool& end_stream,
bool& headers_only);
bool commonHandleAfterDataCallback(FilterDataStatus status, Buffer::Instance& provided_data,
bool& buffer_was_streaming);
bool commonHandleAfterTrailersCallback(FilterTrailersStatus status);
Expand Down Expand Up @@ -195,9 +196,6 @@ struct ActiveStreamDecoderFilter : public ActiveStreamFilterBase,
FilterHeadersStatus decodeHeaders(RequestHeaderMap& headers, bool end_stream) {
is_grpc_request_ = Grpc::Common::isGrpcRequestHeaders(headers);
FilterHeadersStatus status = handle_->decodeHeaders(headers, end_stream);
if (end_stream) {
handle_->decodeComplete();
}
return status;
}

Expand Down
66 changes: 64 additions & 2 deletions test/common/http/conn_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5179,6 +5179,7 @@ TEST_F(HttpConnectionManagerImplTest, FilterContinueAndEndStreamHeaders) {

EXPECT_CALL(*decoder_filters_[0], decodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::ContinueAndEndStream));
EXPECT_CALL(*decoder_filters_[0], decodeComplete());
EXPECT_CALL(*decoder_filters_[1], decodeHeaders(_, true))
.WillOnce(Return(FilterHeadersStatus::Continue));
EXPECT_CALL(*decoder_filters_[1], decodeComplete());
Expand All @@ -5187,7 +5188,7 @@ TEST_F(HttpConnectionManagerImplTest, FilterContinueAndEndStreamHeaders) {
Buffer::OwnedImpl fake_input("1234");
conn_manager_->onData(fake_input, true);

EXPECT_CALL(*encoder_filters_[1], encodeHeaders(_, true))
EXPECT_CALL(*encoder_filters_[1], encodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::ContinueAndEndStream));
EXPECT_CALL(*encoder_filters_[1], encodeComplete());
EXPECT_CALL(*encoder_filters_[0], encodeHeaders(_, true))
Expand All @@ -5199,7 +5200,7 @@ TEST_F(HttpConnectionManagerImplTest, FilterContinueAndEndStreamHeaders) {

decoder_filters_[1]->callbacks_->streamInfo().setResponseCodeDetails("");
decoder_filters_[1]->callbacks_->encodeHeaders(
makeHeaderMap<TestResponseHeaderMapImpl>({{":status", "200"}}), true);
makeHeaderMap<TestResponseHeaderMapImpl>({{":status", "200"}}), false);

Buffer::OwnedImpl response_body("response");
decoder_filters_[1]->callbacks_->encodeData(response_body, true);
Expand All @@ -5224,6 +5225,7 @@ TEST_F(HttpConnectionManagerImplTest, FilterContinueAndEndStreamData) {

EXPECT_CALL(*decoder_filters_[0], decodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::ContinueAndEndStream));
EXPECT_CALL(*decoder_filters_[0], decodeComplete());
EXPECT_CALL(*decoder_filters_[1], decodeHeaders(_, true))
.WillOnce(Return(FilterHeadersStatus::Continue));
EXPECT_CALL(*decoder_filters_[1], decodeComplete());
Expand All @@ -5234,6 +5236,7 @@ TEST_F(HttpConnectionManagerImplTest, FilterContinueAndEndStreamData) {

EXPECT_CALL(*encoder_filters_[1], encodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::ContinueAndEndStream));
EXPECT_CALL(*encoder_filters_[1], encodeComplete());
EXPECT_CALL(*encoder_filters_[0], encodeHeaders(_, true))
.WillOnce(Return(FilterHeadersStatus::Continue));
EXPECT_CALL(*encoder_filters_[0], encodeComplete());
Expand Down Expand Up @@ -5271,6 +5274,7 @@ TEST_F(HttpConnectionManagerImplTest, FilterContinueAndEndStreamTrailers) {

EXPECT_CALL(*decoder_filters_[0], decodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::ContinueAndEndStream));
EXPECT_CALL(*decoder_filters_[0], decodeComplete());
EXPECT_CALL(*decoder_filters_[1], decodeHeaders(_, true))
.WillOnce(Return(FilterHeadersStatus::Continue));
EXPECT_CALL(*decoder_filters_[1], decodeComplete());
Expand All @@ -5281,6 +5285,7 @@ TEST_F(HttpConnectionManagerImplTest, FilterContinueAndEndStreamTrailers) {

EXPECT_CALL(*encoder_filters_[1], encodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::ContinueAndEndStream));
EXPECT_CALL(*encoder_filters_[1], encodeComplete());
EXPECT_CALL(*encoder_filters_[0], encodeHeaders(_, true))
.WillOnce(Return(FilterHeadersStatus::Continue));
EXPECT_CALL(*encoder_filters_[0], encodeComplete());
Expand All @@ -5299,6 +5304,63 @@ TEST_F(HttpConnectionManagerImplTest, FilterContinueAndEndStreamTrailers) {
decoder_filters_[1]->callbacks_->encodeTrailers(std::move(response_trailers));
}

// Filter continues headers iteration without ending the stream, then injects a body later.
TEST_F(HttpConnectionManagerImplTest, FilterContinueDontEndStreamInjectBody) {
InSequence s;
setup(false, "");

EXPECT_CALL(*codec_, dispatch(_)).WillOnce(Invoke([&](Buffer::Instance&) -> Http::Status {
RequestDecoder* decoder = &conn_manager_->newStream(response_encoder_);
auto headers = makeHeaderMap<TestRequestHeaderMapImpl>(
{{":authority", "host"}, {":path", "/"}, {":method", "GET"}});
decoder->decodeHeaders(std::move(headers), true);
return Http::okStatus();
}));

setupFilterChain(2, 2);

// Decode filter 0 changes end_stream to false.
EXPECT_CALL(*decoder_filters_[0], decodeHeaders(_, true))
.WillOnce(Return(FilterHeadersStatus::ContinueAndDontEndStream));
EXPECT_CALL(*decoder_filters_[0], decodeComplete());
EXPECT_CALL(*decoder_filters_[1], decodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::Continue));

// Kick off the incoming data.
Buffer::OwnedImpl fake_input("1234");
conn_manager_->onData(fake_input, true);

EXPECT_CALL(*decoder_filters_[1], decodeData(_, true))
.WillOnce(Return(FilterDataStatus::Continue));
EXPECT_CALL(*decoder_filters_[1], decodeComplete());

// Decode filter 0 injects request body later.
Buffer::OwnedImpl data("hello");
decoder_filters_[0]->callbacks_->injectDecodedDataToFilterChain(data, true);

// Encode filter 1 changes end_stream to false.
EXPECT_CALL(*encoder_filters_[1], encodeHeaders(_, true))
.WillOnce(Return(FilterHeadersStatus::ContinueAndDontEndStream));
EXPECT_CALL(*encoder_filters_[1], encodeComplete());
EXPECT_CALL(*encoder_filters_[0], encodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::Continue));
EXPECT_CALL(response_encoder_, encodeHeaders(_, false));

decoder_filters_[1]->callbacks_->streamInfo().setResponseCodeDetails("");
decoder_filters_[1]->callbacks_->encodeHeaders(
makeHeaderMap<TestResponseHeaderMapImpl>({{":status", "200"}}), true);

EXPECT_CALL(*encoder_filters_[0], encodeData(_, true))
.WillOnce(Return(FilterDataStatus::Continue));
EXPECT_CALL(*encoder_filters_[0], encodeComplete());
EXPECT_CALL(response_encoder_, encodeData(_, true));
expectOnDestroy();

// Encode filter 1 injects request body later.
Buffer::OwnedImpl data2("hello");
encoder_filters_[1]->callbacks_->injectEncodedDataToFilterChain(data2, true);
}

TEST_F(HttpConnectionManagerImplTest, FilterAddBodyContinuation) {
InSequence s;
setup(false, "");
Expand Down
1 change: 1 addition & 0 deletions test/integration/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,7 @@ envoy_cc_test(
"//source/common/http:header_map_lib",
"//source/extensions/filters/http/buffer:config",
"//source/extensions/filters/http/health_check:config",
"//test/integration/filters:continue_headers_only_inject_body",
"//test/integration/filters:encoder_decoder_buffer_filter_lib",
"//test/integration/filters:random_pause_filter_lib",
"//test/test_common:utility_lib",
Expand Down
15 changes: 15 additions & 0 deletions test/integration/filters/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,21 @@ envoy_cc_test_library(
],
)

envoy_cc_test_library(
name = "continue_headers_only_inject_body",
srcs = [
"continue_headers_only_inject_body_filter.cc",
],
deps = [
":common_lib",
"//include/envoy/http:filter_interface",
"//include/envoy/registry",
"//include/envoy/server:filter_config_interface",
"//source/extensions/filters/http/common:pass_through_filter_lib",
"//test/extensions/filters/http/common:empty_http_filter_config_lib",
],
)

envoy_cc_test_library(
name = "wait_for_whole_request_and_response_config_lib",
srcs = [
Expand Down
Loading