diff --git a/include/envoy/http/filter.h b/include/envoy/http/filter.h index 3e855e99f6fc7..1bb611a2a1b14 100644 --- a/include/envoy/http/filter.h +++ b/include/envoy/http/filter.h @@ -32,10 +32,47 @@ enum class FilterHeadersStatus { // FilterDataStatus::Continue from decodeData()/encodeData() or calling // continueDecoding()/continueEncoding() MUST be called if continued filter iteration is desired. StopIteration, - // Continue iteration to remaining filters, but ignore any subsequent data or trailers. This - // results in creating a header only request/response. + // Continue headers iteration to remaining filters, but ignore any subsequent data or trailers. + // This results in creating a header only request/response. // This status MUST NOT be returned by decodeHeaders() when end_stream is set to true. ContinueAndEndStream, + // Continue headers iteration to remaining filters, but delay ending the stream. This status MUST + // NOT be returned when end_stream is already set to false. + // + // Used when a filter wants to add a body to a headers-only request/response, but this body is not + // readily available. Delaying end_stream allows the filter to add the body once it's available + // without stopping headers iteration. + // + // The filter is responsible to continue the stream by providing a body through calling + // injectDecodedDataToFilterChain()/injectEncodedDataToFilterChain(), possibly multiple times + // if the body needs to be divided into several chunks. The filter may need to handle + // watermark events when injecting a body, see: + // https://github.com/envoyproxy/envoy/blob/master/source/docs/flow_control.md. + // + // The last call to inject data MUST have end_stream set to true to conclude the stream. + // If the filter cannot provide a body the stream should be reset. + // + // Adding a body through calling addDecodedData()/addEncodedData() then + // continueDecoding()/continueEncoding() is currently NOT supported and causes an assert failure. + // + // Adding trailers in this scenario is currently NOT supported. + // + // The filter MUST NOT attempt to continue the stream without providing a body using + // continueDecoding()/continueEncoding(). + // + // TODO(yosrym93): Support adding a body in this case by calling addDecodedData()/addEncodedData() + // then continueDecoding()/continueEncoding(). To support this a new FilterManager::IterationState + // needs to be added and set when a filter returns this status in + // FilterManager::decodeHeaders/FilterManager::encodeHeaders() + // Currently, when a filter returns this, the IterationState is Continue. This causes ASSERTs in + // FilterManager::commonContinue() to fail when continueDecoding()/continueEncoding() is called; + // due to trying to continue iteration when the IterationState is already Continue. + // In this case, a different ASSERT will be needed to make sure the filter does not try to + // continue without adding a body first. + // + // TODO(yosrym93): Support adding trailers in this case by implementing new functions to inject + // trailers, similar to the inject data functions. + ContinueAndDontEndStream, // Do not iterate for headers as well as data and trailers for the current filter and the filters // following, and buffer body data for later dispatching. ContinueDecoding() MUST // be called if continued filter iteration is desired. diff --git a/source/common/http/filter_manager.cc b/source/common/http/filter_manager.cc index d135480bc8ecc..35c82287465f2 100644 --- a/source/common/http/filter_manager.cc +++ b/source/common/http/filter_manager.cc @@ -51,7 +51,8 @@ void ActiveStreamFilterBase::commonContinue() { ENVOY_STREAM_LOG(trace, "continuing filter chain: filter={}", *this, static_cast(this)); - ASSERT(!canIterate()); + ASSERT(!canIterate(), + "Attempting to continue iteration while the IterationState is already Continue"); // If iteration has stopped for all frame types, set iterate_from_current_filter_ to true so the // filter iteration starts with the current filter instead of the next one. if (stoppedAll()) { @@ -108,24 +109,36 @@ bool ActiveStreamFilterBase::commonHandleAfter100ContinueHeadersCallback( } bool ActiveStreamFilterBase::commonHandleAfterHeadersCallback(FilterHeadersStatus status, + bool& end_stream, bool& headers_only) { ASSERT(!headers_continued_); ASSERT(canIterate()); - if (status == FilterHeadersStatus::StopIteration) { + switch (status) { + case FilterHeadersStatus::StopIteration: iteration_state_ = IterationState::StopSingleIteration; - } else if (status == FilterHeadersStatus::StopAllIterationAndBuffer) { + break; + case FilterHeadersStatus::StopAllIterationAndBuffer: iteration_state_ = IterationState::StopAllBuffer; - } else if (status == FilterHeadersStatus::StopAllIterationAndWatermark) { + break; + case FilterHeadersStatus::StopAllIterationAndWatermark: iteration_state_ = IterationState::StopAllWatermark; - } else if (status == FilterHeadersStatus::ContinueAndEndStream) { + break; + case FilterHeadersStatus::ContinueAndEndStream: // Set headers_only to true so we know to end early if necessary, // but continue filter iteration so we actually write the headers/run the cleanup code. headers_only = true; ENVOY_STREAM_LOG(debug, "converting to headers only", parent_); - } else { - ASSERT(status == FilterHeadersStatus::Continue); + break; + case FilterHeadersStatus::ContinueAndDontEndStream: + headers_only = false; + end_stream = false; headers_continued_ = true; + ENVOY_STREAM_LOG(debug, "converting to headers and body (body not available yet)", parent_); + break; + case FilterHeadersStatus::Continue: + headers_continued_ = true; + break; } handleMetadataAfterHeadersCallback(); @@ -436,11 +449,28 @@ void FilterManager::decodeHeaders(ActiveStreamDecoderFilter* filter, RequestHead (end_stream && continue_data_entry == decoder_filters_.end()); FilterHeadersStatus status = (*entry)->decodeHeaders(headers, (*entry)->end_stream_); - ASSERT(!(status == FilterHeadersStatus::ContinueAndEndStream && (*entry)->end_stream_)); + ASSERT(!(status == FilterHeadersStatus::ContinueAndEndStream && (*entry)->end_stream_), + "Filters should not return FilterHeadersStatus::ContinueAndEndStream from decodeHeaders " + "when end_stream is already true"); + ASSERT(!(status == FilterHeadersStatus::ContinueAndDontEndStream && !(*entry)->end_stream_), + "Filters should not return FilterHeadersStatus::ContinueAndDontEndStream from " + "decodeHeaders when end_stream is already false"); + state_.filter_call_state_ &= ~FilterCallState::DecodeHeaders; ENVOY_STREAM_LOG(trace, "decode headers called: filter={} status={}", *this, static_cast((*entry).get()), static_cast(status)); + (*entry)->decode_headers_called_ = true; + + // decoding_headers_only_ is set if the filter returns ContinueAndEndStream. + const auto continue_iteration = (*entry)->commonHandleAfterHeadersCallback( + status, end_stream, state_.decoding_headers_only_); + + // If this filter ended the stream, decodeComplete() should be called for it. + if ((*entry)->end_stream_ || state_.decoding_headers_only_) { + (*entry)->handle_->decodeComplete(); + } + const bool new_metadata_added = processNewlyAddedMetadata(); // If end_stream is set in headers, and a filter adds new metadata, we need to delay end_stream // in headers by inserting an empty data frame with end_stream set. The empty data frame is sent @@ -454,9 +484,7 @@ void FilterManager::decodeHeaders(ActiveStreamDecoderFilter* filter, RequestHead addDecodedData(*((*entry).get()), empty_data, true); } - (*entry)->decode_headers_called_ = true; - if (!(*entry)->commonHandleAfterHeadersCallback(status, state_.decoding_headers_only_) && - std::next(entry) != decoder_filters_.end()) { + if (!continue_iteration && std::next(entry) != decoder_filters_.end()) { // Stop iteration IFF this is not the last filter. If it is the last filter, continue with // processing since we need to handle the case where a terminal filter wants to buffer, but // a previous filter has added body. @@ -642,7 +670,6 @@ void FilterManager::decodeTrailers(ActiveStreamDecoderFilter* filter, RequestTra if ((*entry)->stoppedAll()) { return; } - ASSERT(!(state_.filter_call_state_ & FilterCallState::DecodeTrailers)); state_.filter_call_state_ |= FilterCallState::DecodeTrailers; FilterTrailersStatus status = (*entry)->handle_->decodeTrailers(trailers); @@ -905,16 +932,28 @@ void FilterManager::encodeHeaders(ActiveStreamEncoderFilter* filter, ResponseHea (*entry)->end_stream_ = state_.encoding_headers_only_ || (end_stream && continue_data_entry == encoder_filters_.end()); FilterHeadersStatus status = (*entry)->handle_->encodeHeaders(headers, (*entry)->end_stream_); - if ((*entry)->end_stream_) { - (*entry)->handle_->encodeComplete(); - } + + ASSERT(!(status == FilterHeadersStatus::ContinueAndEndStream && (*entry)->end_stream_), + "Filters should not return FilterHeadersStatus::ContinueAndEndStream from encodeHeaders " + "when end_stream is already true"); + ASSERT(!(status == FilterHeadersStatus::ContinueAndDontEndStream && !(*entry)->end_stream_), + "Filters should not return FilterHeadersStatus::ContinueAndDontEndStream from " + "encodeHeaders when end_stream is already false"); + state_.filter_call_state_ &= ~FilterCallState::EncodeHeaders; ENVOY_STREAM_LOG(trace, "encode headers called: filter={} status={}", *this, static_cast((*entry).get()), static_cast(status)); (*entry)->encode_headers_called_ = true; - const auto continue_iteration = - (*entry)->commonHandleAfterHeadersCallback(status, state_.encoding_headers_only_); + + // encoding_headers_only_ is set if the filter returns ContinueAndEndStream. + const auto continue_iteration = (*entry)->commonHandleAfterHeadersCallback( + status, end_stream, state_.encoding_headers_only_); + + // If this filter ended the stream, encodeComplete() should be called for it. + if ((*entry)->end_stream_ || state_.encoding_headers_only_) { + (*entry)->handle_->encodeComplete(); + } // If we're encoding a headers only response, then mark the local as complete. This ensures // that we don't attempt to reset the downstream request in doEndStream. @@ -1335,6 +1374,8 @@ void ActiveStreamEncoderFilter::addEncodedData(Buffer::Instance& data, bool stre void ActiveStreamEncoderFilter::injectEncodedDataToFilterChain(Buffer::Instance& data, bool end_stream) { + // TODO(yosrym93): Check if this filter had previously stopped headers iteration. + // If so, it should be continued before injecting data. parent_.encodeData(this, data, end_stream, FilterManager::FilterIterationStartState::CanStartFromCurrent); } diff --git a/source/common/http/filter_manager.h b/source/common/http/filter_manager.h index fe2c9d52b0d39..9423e0a8ca4fc 100644 --- a/source/common/http/filter_manager.h +++ b/source/common/http/filter_manager.h @@ -31,7 +31,8 @@ struct ActiveStreamFilterBase : public virtual StreamFilterCallbacks, // corresponding data. Those functions handle state updates and data storage (if needed) // according to the status returned by filter's callback functions. bool commonHandleAfter100ContinueHeadersCallback(FilterHeadersStatus status); - bool commonHandleAfterHeadersCallback(FilterHeadersStatus status, bool& headers_only); + bool commonHandleAfterHeadersCallback(FilterHeadersStatus status, bool& end_stream, + bool& headers_only); bool commonHandleAfterDataCallback(FilterDataStatus status, Buffer::Instance& provided_data, bool& buffer_was_streaming); bool commonHandleAfterTrailersCallback(FilterTrailersStatus status); @@ -195,9 +196,6 @@ struct ActiveStreamDecoderFilter : public ActiveStreamFilterBase, FilterHeadersStatus decodeHeaders(RequestHeaderMap& headers, bool end_stream) { is_grpc_request_ = Grpc::Common::isGrpcRequestHeaders(headers); FilterHeadersStatus status = handle_->decodeHeaders(headers, end_stream); - if (end_stream) { - handle_->decodeComplete(); - } return status; } diff --git a/test/common/http/conn_manager_impl_test.cc b/test/common/http/conn_manager_impl_test.cc index 9db973e2a24cf..02c556b14a74f 100644 --- a/test/common/http/conn_manager_impl_test.cc +++ b/test/common/http/conn_manager_impl_test.cc @@ -5179,6 +5179,7 @@ TEST_F(HttpConnectionManagerImplTest, FilterContinueAndEndStreamHeaders) { EXPECT_CALL(*decoder_filters_[0], decodeHeaders(_, false)) .WillOnce(Return(FilterHeadersStatus::ContinueAndEndStream)); + EXPECT_CALL(*decoder_filters_[0], decodeComplete()); EXPECT_CALL(*decoder_filters_[1], decodeHeaders(_, true)) .WillOnce(Return(FilterHeadersStatus::Continue)); EXPECT_CALL(*decoder_filters_[1], decodeComplete()); @@ -5187,7 +5188,7 @@ TEST_F(HttpConnectionManagerImplTest, FilterContinueAndEndStreamHeaders) { Buffer::OwnedImpl fake_input("1234"); conn_manager_->onData(fake_input, true); - EXPECT_CALL(*encoder_filters_[1], encodeHeaders(_, true)) + EXPECT_CALL(*encoder_filters_[1], encodeHeaders(_, false)) .WillOnce(Return(FilterHeadersStatus::ContinueAndEndStream)); EXPECT_CALL(*encoder_filters_[1], encodeComplete()); EXPECT_CALL(*encoder_filters_[0], encodeHeaders(_, true)) @@ -5199,7 +5200,7 @@ TEST_F(HttpConnectionManagerImplTest, FilterContinueAndEndStreamHeaders) { decoder_filters_[1]->callbacks_->streamInfo().setResponseCodeDetails(""); decoder_filters_[1]->callbacks_->encodeHeaders( - makeHeaderMap({{":status", "200"}}), true); + makeHeaderMap({{":status", "200"}}), false); Buffer::OwnedImpl response_body("response"); decoder_filters_[1]->callbacks_->encodeData(response_body, true); @@ -5224,6 +5225,7 @@ TEST_F(HttpConnectionManagerImplTest, FilterContinueAndEndStreamData) { EXPECT_CALL(*decoder_filters_[0], decodeHeaders(_, false)) .WillOnce(Return(FilterHeadersStatus::ContinueAndEndStream)); + EXPECT_CALL(*decoder_filters_[0], decodeComplete()); EXPECT_CALL(*decoder_filters_[1], decodeHeaders(_, true)) .WillOnce(Return(FilterHeadersStatus::Continue)); EXPECT_CALL(*decoder_filters_[1], decodeComplete()); @@ -5234,6 +5236,7 @@ TEST_F(HttpConnectionManagerImplTest, FilterContinueAndEndStreamData) { EXPECT_CALL(*encoder_filters_[1], encodeHeaders(_, false)) .WillOnce(Return(FilterHeadersStatus::ContinueAndEndStream)); + EXPECT_CALL(*encoder_filters_[1], encodeComplete()); EXPECT_CALL(*encoder_filters_[0], encodeHeaders(_, true)) .WillOnce(Return(FilterHeadersStatus::Continue)); EXPECT_CALL(*encoder_filters_[0], encodeComplete()); @@ -5271,6 +5274,7 @@ TEST_F(HttpConnectionManagerImplTest, FilterContinueAndEndStreamTrailers) { EXPECT_CALL(*decoder_filters_[0], decodeHeaders(_, false)) .WillOnce(Return(FilterHeadersStatus::ContinueAndEndStream)); + EXPECT_CALL(*decoder_filters_[0], decodeComplete()); EXPECT_CALL(*decoder_filters_[1], decodeHeaders(_, true)) .WillOnce(Return(FilterHeadersStatus::Continue)); EXPECT_CALL(*decoder_filters_[1], decodeComplete()); @@ -5281,6 +5285,7 @@ TEST_F(HttpConnectionManagerImplTest, FilterContinueAndEndStreamTrailers) { EXPECT_CALL(*encoder_filters_[1], encodeHeaders(_, false)) .WillOnce(Return(FilterHeadersStatus::ContinueAndEndStream)); + EXPECT_CALL(*encoder_filters_[1], encodeComplete()); EXPECT_CALL(*encoder_filters_[0], encodeHeaders(_, true)) .WillOnce(Return(FilterHeadersStatus::Continue)); EXPECT_CALL(*encoder_filters_[0], encodeComplete()); @@ -5299,6 +5304,63 @@ TEST_F(HttpConnectionManagerImplTest, FilterContinueAndEndStreamTrailers) { decoder_filters_[1]->callbacks_->encodeTrailers(std::move(response_trailers)); } +// Filter continues headers iteration without ending the stream, then injects a body later. +TEST_F(HttpConnectionManagerImplTest, FilterContinueDontEndStreamInjectBody) { + InSequence s; + setup(false, ""); + + EXPECT_CALL(*codec_, dispatch(_)).WillOnce(Invoke([&](Buffer::Instance&) -> Http::Status { + RequestDecoder* decoder = &conn_manager_->newStream(response_encoder_); + auto headers = makeHeaderMap( + {{":authority", "host"}, {":path", "/"}, {":method", "GET"}}); + decoder->decodeHeaders(std::move(headers), true); + return Http::okStatus(); + })); + + setupFilterChain(2, 2); + + // Decode filter 0 changes end_stream to false. + EXPECT_CALL(*decoder_filters_[0], decodeHeaders(_, true)) + .WillOnce(Return(FilterHeadersStatus::ContinueAndDontEndStream)); + EXPECT_CALL(*decoder_filters_[0], decodeComplete()); + EXPECT_CALL(*decoder_filters_[1], decodeHeaders(_, false)) + .WillOnce(Return(FilterHeadersStatus::Continue)); + + // Kick off the incoming data. + Buffer::OwnedImpl fake_input("1234"); + conn_manager_->onData(fake_input, true); + + EXPECT_CALL(*decoder_filters_[1], decodeData(_, true)) + .WillOnce(Return(FilterDataStatus::Continue)); + EXPECT_CALL(*decoder_filters_[1], decodeComplete()); + + // Decode filter 0 injects request body later. + Buffer::OwnedImpl data("hello"); + decoder_filters_[0]->callbacks_->injectDecodedDataToFilterChain(data, true); + + // Encode filter 1 changes end_stream to false. + EXPECT_CALL(*encoder_filters_[1], encodeHeaders(_, true)) + .WillOnce(Return(FilterHeadersStatus::ContinueAndDontEndStream)); + EXPECT_CALL(*encoder_filters_[1], encodeComplete()); + EXPECT_CALL(*encoder_filters_[0], encodeHeaders(_, false)) + .WillOnce(Return(FilterHeadersStatus::Continue)); + EXPECT_CALL(response_encoder_, encodeHeaders(_, false)); + + decoder_filters_[1]->callbacks_->streamInfo().setResponseCodeDetails(""); + decoder_filters_[1]->callbacks_->encodeHeaders( + makeHeaderMap({{":status", "200"}}), true); + + EXPECT_CALL(*encoder_filters_[0], encodeData(_, true)) + .WillOnce(Return(FilterDataStatus::Continue)); + EXPECT_CALL(*encoder_filters_[0], encodeComplete()); + EXPECT_CALL(response_encoder_, encodeData(_, true)); + expectOnDestroy(); + + // Encode filter 1 injects request body later. + Buffer::OwnedImpl data2("hello"); + encoder_filters_[1]->callbacks_->injectEncodedDataToFilterChain(data2, true); +} + TEST_F(HttpConnectionManagerImplTest, FilterAddBodyContinuation) { InSequence s; setup(false, ""); diff --git a/test/integration/BUILD b/test/integration/BUILD index e9deb4ac00b1f..f2081b42f6d1c 100644 --- a/test/integration/BUILD +++ b/test/integration/BUILD @@ -421,6 +421,7 @@ envoy_cc_test( "//source/common/http:header_map_lib", "//source/extensions/filters/http/buffer:config", "//source/extensions/filters/http/health_check:config", + "//test/integration/filters:continue_headers_only_inject_body", "//test/integration/filters:encoder_decoder_buffer_filter_lib", "//test/integration/filters:random_pause_filter_lib", "//test/test_common:utility_lib", diff --git a/test/integration/filters/BUILD b/test/integration/filters/BUILD index bffde48d8d77f..1746a3952626e 100644 --- a/test/integration/filters/BUILD +++ b/test/integration/filters/BUILD @@ -24,6 +24,21 @@ envoy_cc_test_library( ], ) +envoy_cc_test_library( + name = "continue_headers_only_inject_body", + srcs = [ + "continue_headers_only_inject_body_filter.cc", + ], + deps = [ + ":common_lib", + "//include/envoy/http:filter_interface", + "//include/envoy/registry", + "//include/envoy/server:filter_config_interface", + "//source/extensions/filters/http/common:pass_through_filter_lib", + "//test/extensions/filters/http/common:empty_http_filter_config_lib", + ], +) + envoy_cc_test_library( name = "wait_for_whole_request_and_response_config_lib", srcs = [ diff --git a/test/integration/filters/continue_headers_only_inject_body_filter.cc b/test/integration/filters/continue_headers_only_inject_body_filter.cc new file mode 100644 index 0000000000000..a7f1b952e4ceb --- /dev/null +++ b/test/integration/filters/continue_headers_only_inject_body_filter.cc @@ -0,0 +1,47 @@ +#include + +#include "envoy/http/filter.h" +#include "envoy/registry/registry.h" +#include "envoy/server/filter_config.h" + +#include "common/buffer/buffer_impl.h" + +#include "extensions/filters/http/common/pass_through_filter.h" + +#include "test/extensions/filters/http/common/empty_http_filter_config.h" +#include "test/integration/filters/common.h" + +namespace Envoy { + +// A test filter that continues iteration of headers-only request/response without ending the +// stream, then injects a body later. +class ContinueHeadersOnlyInjectBodyFilter : public Http::PassThroughFilter { +public: + constexpr static char name[] = "continue-headers-only-inject-body-filter"; + + Http::FilterHeadersStatus decodeHeaders(Http::RequestHeaderMap& headers, bool) override { + headers.setContentLength(body_.length()); + decoder_callbacks_->dispatcher().post([this]() -> void { + Buffer::OwnedImpl buffer(body_); + decoder_callbacks_->injectDecodedDataToFilterChain(buffer, true); + }); + return Http::FilterHeadersStatus::ContinueAndDontEndStream; + } + + Http::FilterHeadersStatus encodeHeaders(Http::ResponseHeaderMap& headers, bool) override { + headers.setContentLength(body_.length()); + encoder_callbacks_->dispatcher().post([this]() -> void { + Buffer::OwnedImpl buffer(body_); + encoder_callbacks_->injectEncodedDataToFilterChain(buffer, true); + }); + return Http::FilterHeadersStatus::ContinueAndDontEndStream; + } + +private: + constexpr static absl::string_view body_ = "body"; +}; + +static Registry::RegisterFactory, + Server::Configuration::NamedHttpFilterConfigFactory> + register_; +} // namespace Envoy diff --git a/test/integration/protocol_integration_test.cc b/test/integration/protocol_integration_test.cc index dc798683bcb1e..719e34954f2d1 100644 --- a/test/integration/protocol_integration_test.cc +++ b/test/integration/protocol_integration_test.cc @@ -258,6 +258,34 @@ TEST_P(ProtocolIntegrationTest, AddBodyToResponseAndWaitForIt) { EXPECT_EQ("body", response->body()); } +TEST_P(ProtocolIntegrationTest, ContinueHeadersOnlyInjectBodyFilter) { + config_helper_.addFilter(R"EOF( + name: continue-headers-only-inject-body-filter + typed_config: + "@type": type.googleapis.com/google.protobuf.Empty + )EOF"); + initialize(); + + codec_client_ = makeHttpConnection(lookupPort("http")); + + // Send a headers only request. + auto response = codec_client_->makeHeaderOnlyRequest(default_request_headers_); + waitForNextUpstreamRequest(); + + // Make sure that the body was injected to the request. + EXPECT_TRUE(upstream_request_->complete()); + EXPECT_TRUE(upstream_request_->receivedData()); + EXPECT_EQ(upstream_request_->body().toString(), "body"); + + // Send a headers only response. + upstream_request_->encodeHeaders(default_response_headers_, true); + response->waitForEndStream(); + + // Make sure that the body was injected to the response. + EXPECT_TRUE(response->complete()); + EXPECT_EQ(response->body(), "body"); +} + TEST_P(ProtocolIntegrationTest, AddEncodedTrailers) { config_helper_.addFilter(R"EOF( name: add-trailers-filter