Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions include/envoy/http/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,15 @@ enum class FilterHeadersStatus {
// results in creating a header only request/response.
// This status MUST NOT be returned by decodeHeaders() when end_stream is set to true.
ContinueAndEndStream,
// Continue iteration to remaining filters, but do not end the stream.
Comment thread
yosrym93 marked this conversation as resolved.
Outdated
//
// Used when a filter wants to add a body to headers-only request/response, but this body is not
// readily available.
// This causes the headers iteration to continue, but the stream does not end.
// The filter is responsible to continue the stream by providing a body through
Comment thread
yosrym93 marked this conversation as resolved.
Outdated
// injectEncodedDataToFilterChain()/injectDecodedDataToFilterChain().
// If the filter cannot provide a body the stream should be reset.
ContinueAndDontEndStream,
Comment thread
yosrym93 marked this conversation as resolved.
Outdated
Comment thread
yosrym93 marked this conversation as resolved.
// Do not iterate for headers as well as data and trailers for the current filter and the filters
// following, and buffer body data for later dispatching. ContinueDecoding() MUST
// be called if continued filter iteration is desired.
Expand Down
15 changes: 12 additions & 3 deletions source/common/http/filter_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ bool ActiveStreamFilterBase::commonHandleAfter100ContinueHeadersCallback(
}

bool ActiveStreamFilterBase::commonHandleAfterHeadersCallback(FilterHeadersStatus status,
bool& end_stream,
bool& headers_only) {
ASSERT(!headers_continued_);
ASSERT(canIterate());
Expand All @@ -123,6 +124,11 @@ bool ActiveStreamFilterBase::commonHandleAfterHeadersCallback(FilterHeadersStatu
// but continue filter iteration so we actually write the headers/run the cleanup code.
headers_only = true;
ENVOY_STREAM_LOG(debug, "converting to headers only", parent_);
} else if (status == FilterHeadersStatus::ContinueAndDontEndStream) {
Comment thread
yosrym93 marked this conversation as resolved.
Outdated
headers_only = false;
end_stream = false;
headers_continued_ = true;
ENVOY_STREAM_LOG(debug, "converting to headers and body (body not available yet)", parent_);
} else {
ASSERT(status == FilterHeadersStatus::Continue);
headers_continued_ = true;
Expand Down Expand Up @@ -453,7 +459,8 @@ void FilterManager::decodeHeaders(ActiveStreamDecoderFilter* filter, RequestHead
}

(*entry)->decode_headers_called_ = true;
if (!(*entry)->commonHandleAfterHeadersCallback(status, state_.decoding_headers_only_) &&
if (!(*entry)->commonHandleAfterHeadersCallback(status, end_stream,
state_.decoding_headers_only_) &&
std::next(entry) != decoder_filters_.end()) {
// Stop iteration IFF this is not the last filter. If it is the last filter, continue with
// processing since we need to handle the case where a terminal filter wants to buffer, but
Expand Down Expand Up @@ -910,8 +917,8 @@ void FilterManager::encodeHeaders(ActiveStreamEncoderFilter* filter, ResponseHea
static_cast<const void*>((*entry).get()), static_cast<uint64_t>(status));

(*entry)->encode_headers_called_ = true;
const auto continue_iteration =
(*entry)->commonHandleAfterHeadersCallback(status, state_.encoding_headers_only_);
const auto continue_iteration = (*entry)->commonHandleAfterHeadersCallback(
status, end_stream, state_.encoding_headers_only_);

// If we're encoding a headers only response, then mark the local as complete. This ensures
// that we don't attempt to reset the downstream request in doEndStream.
Expand Down Expand Up @@ -1331,6 +1338,8 @@ void ActiveStreamEncoderFilter::addEncodedData(Buffer::Instance& data, bool stre

void ActiveStreamEncoderFilter::injectEncodedDataToFilterChain(Buffer::Instance& data,
bool end_stream) {
// TODO(yosrym93): Check if this filter had previously stopped headers iteration.
// If so, it should be continued before injecting data.
parent_.encodeData(this, data, end_stream,
FilterManager::FilterIterationStartState::CanStartFromCurrent);
}
Expand Down
3 changes: 2 additions & 1 deletion source/common/http/filter_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ struct ActiveStreamFilterBase : public virtual StreamFilterCallbacks,
// corresponding data. Those functions handle state updates and data storage (if needed)
// according to the status returned by filter's callback functions.
bool commonHandleAfter100ContinueHeadersCallback(FilterHeadersStatus status);
bool commonHandleAfterHeadersCallback(FilterHeadersStatus status, bool& headers_only);
bool commonHandleAfterHeadersCallback(FilterHeadersStatus status, bool& end_stream,
bool& headers_only);
bool commonHandleAfterDataCallback(FilterDataStatus status, Buffer::Instance& provided_data,
bool& buffer_was_streaming);
bool commonHandleAfterTrailersCallback(FilterTrailersStatus status);
Expand Down
57 changes: 57 additions & 0 deletions test/common/http/conn_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5182,6 +5182,63 @@ TEST_F(HttpConnectionManagerImplTest, FilterContinueAndEndStreamTrailers) {
decoder_filters_[1]->callbacks_->encodeTrailers(std::move(response_trailers));
}

// filter continues headers iteration without ending the stream, then injects a body later.
Comment thread
yosrym93 marked this conversation as resolved.
Outdated
TEST_F(HttpConnectionManagerImplTest, FilterContinueDontEndStreamInjectBody) {
InSequence s;
setup(false, "");

EXPECT_CALL(*codec_, dispatch(_)).WillOnce(Invoke([&](Buffer::Instance&) -> Http::Status {
RequestDecoder* decoder = &conn_manager_->newStream(response_encoder_);
auto headers = makeHeaderMap<TestRequestHeaderMapImpl>(
{{":authority", "host"}, {":path", "/"}, {":method", "GET"}});
decoder->decodeHeaders(std::move(headers), true);
return Http::okStatus();
}));

setupFilterChain(2, 2);

// Decode filter 0 changes end_stream to false.
EXPECT_CALL(*decoder_filters_[0], decodeHeaders(_, true))
.WillOnce(Return(FilterHeadersStatus::ContinueAndDontEndStream));
EXPECT_CALL(*decoder_filters_[0], decodeComplete());
EXPECT_CALL(*decoder_filters_[1], decodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::Continue));

// Kick off the incoming data.
Buffer::OwnedImpl fake_input("1234");
conn_manager_->onData(fake_input, true);

EXPECT_CALL(*decoder_filters_[1], decodeData(_, true))
.WillOnce(Return(FilterDataStatus::Continue));
EXPECT_CALL(*decoder_filters_[1], decodeComplete());

// Decode filter 0 injects request body later.
Buffer::OwnedImpl data("hello");
decoder_filters_[0]->callbacks_->injectDecodedDataToFilterChain(data, true);

// Encode filter 1 changes end_stream to false.
EXPECT_CALL(*encoder_filters_[1], encodeHeaders(_, true))
.WillOnce(Return(FilterHeadersStatus::ContinueAndDontEndStream));
EXPECT_CALL(*encoder_filters_[1], encodeComplete());
EXPECT_CALL(*encoder_filters_[0], encodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::Continue));
EXPECT_CALL(response_encoder_, encodeHeaders(_, false));

decoder_filters_[1]->callbacks_->streamInfo().setResponseCodeDetails("");
decoder_filters_[1]->callbacks_->encodeHeaders(
makeHeaderMap<TestResponseHeaderMapImpl>({{":status", "200"}}), true);

EXPECT_CALL(*encoder_filters_[0], encodeData(_, true))
.WillOnce(Return(FilterDataStatus::Continue));
EXPECT_CALL(*encoder_filters_[0], encodeComplete());
EXPECT_CALL(response_encoder_, encodeData(_, true));
expectOnDestroy();

// Encode filter 1 injects request body later.
Buffer::OwnedImpl data2("hello");
encoder_filters_[1]->callbacks_->injectEncodedDataToFilterChain(data2, true);
}

TEST_F(HttpConnectionManagerImplTest, FilterAddBodyContinuation) {
InSequence s;
setup(false, "");
Expand Down