diff --git a/source/common/http/conn_manager_impl.cc b/source/common/http/conn_manager_impl.cc index 966be2858a38d..bd23feb88ba55 100644 --- a/source/common/http/conn_manager_impl.cc +++ b/source/common/http/conn_manager_impl.cc @@ -205,10 +205,10 @@ StreamDecoder& ConnectionManagerImpl::newStream(StreamEncoder& response_encoder) new_stream->response_encoder_ = &response_encoder; new_stream->response_encoder_->getStream().addCallbacks(*new_stream); new_stream->buffer_limit_ = new_stream->response_encoder_->getStream().bufferLimit(); - // Make sure new streams are apprised that the underlying connection is blocked. - if (read_callbacks_->connection().aboveHighWatermark()) { - new_stream->callHighWatermarkCallbacks(); - } + // If the network connection is backed up, the stream should be made aware of it on creation. + // Both HTTP/1.x and HTTP/2 codecs handle this in StreamCallbackHelper::addCallbacks_. + ASSERT(read_callbacks_->connection().aboveHighWatermark() == false || + new_stream->high_watermark_count_ > 0); new_stream->moveIntoList(std::move(new_stream), streams_); return **streams_.begin(); } diff --git a/source/common/http/http1/codec_impl.cc b/source/common/http/http1/codec_impl.cc index 24d9d6d49c3e1..89c34500e551b 100644 --- a/source/common/http/http1/codec_impl.cc +++ b/source/common/http/http1/codec_impl.cc @@ -21,6 +21,12 @@ namespace Http1 { const std::string StreamEncoderImpl::CRLF = "\r\n"; const std::string StreamEncoderImpl::LAST_CHUNK = "0\r\n\r\n"; +StreamEncoderImpl::StreamEncoderImpl(ConnectionImpl& connection) : connection_(connection) { + if (connection_.connection().aboveHighWatermark()) { + runHighWatermarkCallbacks(); + } +} + void StreamEncoderImpl::encodeHeader(const char* key, uint32_t key_size, const char* value, uint32_t value_size) { diff --git a/source/common/http/http1/codec_impl.h b/source/common/http/http1/codec_impl.h index c3ac78cab1433..272dfd2ee2d17 100644 --- a/source/common/http/http1/codec_impl.h +++ b/source/common/http/http1/codec_impl.h @@ -49,7 +49,7 @@ class StreamEncoderImpl : public StreamEncoder, void isResponseToHeadRequest(bool value) { is_response_to_head_request_ = value; } protected: - StreamEncoderImpl(ConnectionImpl& connection) : connection_(connection) {} + StreamEncoderImpl(ConnectionImpl& connection); static const std::string CRLF; static const std::string LAST_CHUNK; diff --git a/source/common/network/connection_impl.h b/source/common/network/connection_impl.h index 7f1fb01b9bc4e..b71fd0127e7cd 100644 --- a/source/common/network/connection_impl.h +++ b/source/common/network/connection_impl.h @@ -20,6 +20,8 @@ #include "absl/types/optional.h" namespace Envoy { +class TestPauseFilter; + namespace Network { /** @@ -149,6 +151,8 @@ class ConnectionImpl : public virtual Connection, Event::FileEventPtr file_event_; private: + friend class ::Envoy::TestPauseFilter; + void onFileEvent(uint32_t events); void onRead(uint64_t read_buffer_size); void onReadReady(); diff --git a/test/common/http/conn_manager_impl_test.cc b/test/common/http/conn_manager_impl_test.cc index c58f3fd8df7d5..017a7bb1f3a3b 100644 --- a/test/common/http/conn_manager_impl_test.cc +++ b/test/common/http/conn_manager_impl_test.cc @@ -2722,78 +2722,6 @@ TEST_F(HttpConnectionManagerImplTest, UpstreamWatermarkCallbacks) { HeaderMapPtr{new TestHeaderMapImpl{{":status", "200"}}}, true); } -TEST_F(HttpConnectionManagerImplTest, DownstreamWatermarkCallbacks) { - setup(false, ""); - setUpEncoderAndDecoder(); - sendReqestHeadersAndData(); - - // Test what happens when there are no subscribers. - conn_manager_->onAboveWriteBufferHighWatermark(); - conn_manager_->onBelowWriteBufferLowWatermark(); - - // The connection manger will outlive callbacks but never reference them once deleted. - MockDownstreamWatermarkCallbacks callbacks; - - // Network::Connection callbacks are passed through the codec - ASSERT(decoder_filters_[0]->callbacks_ != nullptr); - EXPECT_CALL(*codec_, onUnderlyingConnectionAboveWriteBufferHighWatermark()); - conn_manager_->onAboveWriteBufferHighWatermark(); - EXPECT_CALL(*codec_, onUnderlyingConnectionBelowWriteBufferLowWatermark()); - conn_manager_->onBelowWriteBufferLowWatermark(); - - // Now add a watermark subscriber and make sure both the high and low watermark callbacks are - // propogated. - ASSERT_NE(0, decoder_filters_.size()); - decoder_filters_[0]->callbacks_->addDownstreamWatermarkCallbacks(callbacks); - // Make sure encoder filter callbacks are propogated to the watermark subscriber. - EXPECT_CALL(callbacks, onAboveWriteBufferHighWatermark()); - encoder_filters_[0]->callbacks_->onEncoderFilterAboveWriteBufferHighWatermark(); - EXPECT_CALL(callbacks, onBelowWriteBufferLowWatermark()); - encoder_filters_[0]->callbacks_->onEncoderFilterBelowWriteBufferLowWatermark(); - - ASSERT(stream_callbacks_ != nullptr); - // Finally make sure that watermark events on the downstream stream are passed to the watermark - // subscriber. - EXPECT_CALL(callbacks, onAboveWriteBufferHighWatermark()); - stream_callbacks_->onAboveWriteBufferHighWatermark(); - EXPECT_CALL(callbacks, onBelowWriteBufferLowWatermark()); - stream_callbacks_->onBelowWriteBufferLowWatermark(); - - // Set things up so the callbacks have been called twice. - EXPECT_CALL(callbacks, onAboveWriteBufferHighWatermark()); - stream_callbacks_->onAboveWriteBufferHighWatermark(); - EXPECT_CALL(callbacks, onAboveWriteBufferHighWatermark()); - encoder_filters_[0]->callbacks_->onEncoderFilterAboveWriteBufferHighWatermark(); - - // Now unsubscribe and verify no further callbacks are called. - EXPECT_CALL(callbacks, onBelowWriteBufferLowWatermark()).Times(0); - decoder_filters_[0]->callbacks_->removeDownstreamWatermarkCallbacks(callbacks); -} - -TEST_F(HttpConnectionManagerImplTest, UnderlyingConnectionWatermarksPassedOn) { - setup(false, ""); - - // Make sure codec_ is created. - EXPECT_CALL(*codec_, dispatch(_)); - Buffer::OwnedImpl fake_input(""); - conn_manager_->onData(fake_input, false); - - // Mark the connection manger as backed up before the stream is created. - ASSERT_EQ(decoder_filters_.size(), 0); - EXPECT_CALL(*codec_, onUnderlyingConnectionAboveWriteBufferHighWatermark()); - conn_manager_->onAboveWriteBufferHighWatermark(); - - // Now when the stream is created, it should be informed of the connection - // callbacks immediately. - EXPECT_CALL(filter_callbacks_.connection_, aboveHighWatermark()).WillOnce(Return(true)); - setUpEncoderAndDecoder(); - sendReqestHeadersAndData(); - ASSERT_GE(decoder_filters_.size(), 1); - MockDownstreamWatermarkCallbacks callbacks; - EXPECT_CALL(callbacks, onAboveWriteBufferHighWatermark()); - decoder_filters_[0]->callbacks_->addDownstreamWatermarkCallbacks(callbacks); -} - TEST_F(HttpConnectionManagerImplTest, UnderlyingConnectionWatermarksPassedOnWithLazyCreation) { setup(false, ""); @@ -2814,11 +2742,10 @@ TEST_F(HttpConnectionManagerImplTest, UnderlyingConnectionWatermarksPassedOnWith setUpBufferLimits(); EXPECT_CALL(*codec_, dispatch(_)).WillOnce(Invoke([&](Buffer::Instance&) -> void { decoder = &conn_manager_->newStream(response_encoder_); + // Call the high buffer callbacks as the codecs do. + stream_callbacks_->onAboveWriteBufferHighWatermark(); })); - // Verify the high watermark is passed on. - EXPECT_CALL(filter_callbacks_.connection_, aboveHighWatermark()).WillOnce(Return(true)); - // Send fake data to kick off newStream being created. Buffer::OwnedImpl fake_input2("asdf"); conn_manager_->onData(fake_input2, false); @@ -2868,11 +2795,10 @@ TEST_F(HttpConnectionManagerImplTest, UnderlyingConnectionWatermarksUnwoundWithL setUpBufferLimits(); EXPECT_CALL(*codec_, dispatch(_)).WillOnce(Invoke([&](Buffer::Instance&) -> void { decoder = &conn_manager_->newStream(response_encoder_); + // Call the high buffer callbacks as the codecs do. + stream_callbacks_->onAboveWriteBufferHighWatermark(); })); - // Verify the high watermark is passed on. - EXPECT_CALL(filter_callbacks_.connection_, aboveHighWatermark()).WillOnce(Return(true)); - // Send fake data to kick off newStream being created. Buffer::OwnedImpl fake_input2("asdf"); conn_manager_->onData(fake_input2, false); diff --git a/test/integration/BUILD b/test/integration/BUILD index 2335f5f9bdf76..aeadeb6a82665 100644 --- a/test/integration/BUILD +++ b/test/integration/BUILD @@ -189,6 +189,17 @@ envoy_cc_test_library( ], ) +envoy_cc_test_library( + name = "pass_through_filter_lib", + srcs = [ + "pass_through_filter.h", + ], + deps = [ + "//include/envoy/http:filter_interface", + "//include/envoy/registry", + ], +) + envoy_cc_test_library( name = "add_trailers_filter_config_lib", srcs = [ @@ -216,8 +227,10 @@ envoy_cc_test_library( deps = [ ":add_trailers_filter_config_lib", ":integration_lib", + ":pass_through_filter_lib", ":test_host_predicate_lib", "//include/envoy/event:timer_interface", + "//source/common/common:thread_annotations", "//source/extensions/filters/http/router:config", "//source/extensions/filters/network/http_connection_manager:config", "//test/common/upstream:utility_lib", diff --git a/test/integration/http2_integration_test.cc b/test/integration/http2_integration_test.cc index 60f5c30e9d084..d869a36a7c667 100644 --- a/test/integration/http2_integration_test.cc +++ b/test/integration/http2_integration_test.cc @@ -87,6 +87,8 @@ TEST_P(Http2IntegrationTest, RouterUpstreamResponseBeforeRequestComplete) { TEST_P(Http2IntegrationTest, TwoRequests) { testTwoRequests(); } +TEST_P(Http2IntegrationTest, TwoRequestsWithForcedBackup) { testTwoRequests(true); } + TEST_P(Http2IntegrationTest, Retry) { testRetry(); } TEST_P(Http2IntegrationTest, EnvoyHandling100Continue) { testEnvoyHandling100Continue(); } diff --git a/test/integration/http_integration.cc b/test/integration/http_integration.cc index 2d0a5ed81ebe2..3b8ac51673088 100644 --- a/test/integration/http_integration.cc +++ b/test/integration/http_integration.cc @@ -10,18 +10,23 @@ #include "envoy/buffer/buffer.h" #include "envoy/event/dispatcher.h" #include "envoy/http/header_map.h" +#include "envoy/registry/registry.h" #include "common/api/api_impl.h" #include "common/buffer/buffer_impl.h" #include "common/common/fmt.h" +#include "common/common/thread_annotations.h" #include "common/http/headers.h" #include "common/network/connection_impl.h" #include "common/network/utility.h" #include "common/protobuf/utility.h" #include "common/upstream/upstream_impl.h" +#include "extensions/filters/http/common/empty_http_filter_config.h" + #include "test/common/upstream/utility.h" #include "test/integration/autonomous_upstream.h" +#include "test/integration/pass_through_filter.h" #include "test/integration/test_host_predicate_config.h" #include "test/integration/utility.h" #include "test/mocks/upstream/mocks.h" @@ -1275,7 +1280,94 @@ void HttpIntegrationTest::testUpstreamDisconnectWithTwoRequests() { test_server_->waitForCounterGe("cluster.cluster_0.upstream_rq_200", 2); } -void HttpIntegrationTest::testTwoRequests() { +// This filter exists to synthetically test network backup by faking TCP +// connection back-up when an encode is finished, and unblocking it when the +// next stream starts to decode headers. +// Allows regression tests for https://github.com/envoyproxy/envoy/issues/4541 +// TODO(alyssawilk) move this somewhere general and turn it up for more tests. +class TestPauseFilter : public PassThroughFilter { +public: + // Pass in a some global filter state to ensure the Network::Connection is + // blocked and unblocked exactly once. + TestPauseFilter(absl::Mutex& encode_lock, uint32_t& number_of_encode_calls_ref, + uint32_t& number_of_decode_calls_ref) + : encode_lock_(encode_lock), number_of_encode_calls_ref_(number_of_encode_calls_ref), + number_of_decode_calls_ref_(number_of_decode_calls_ref) {} + + Http::FilterDataStatus decodeData(Buffer::Instance& buf, bool end_stream) override { + if (end_stream) { + absl::WriterMutexLock m(&encode_lock_); + number_of_decode_calls_ref_++; + if (number_of_decode_calls_ref_ == 2) { + // If this is the second stream to decode headers, force low watermark state. + connection()->onLowWatermark(); + } + } + return PassThroughFilter::decodeData(buf, end_stream); + } + + Http::FilterDataStatus encodeData(Buffer::Instance& buf, bool end_stream) override { + if (end_stream) { + absl::WriterMutexLock m(&encode_lock_); + number_of_encode_calls_ref_++; + if (number_of_encode_calls_ref_ == 1) { + // If this is the first stream to encode headers, force high watermark state. + connection()->onHighWatermark(); + } + } + return PassThroughFilter::encodeData(buf, end_stream); + } + + Network::ConnectionImpl* connection() { + // As long as we're doing horrible things let's do *all* the horrible things. + // Assert the connection we have is a ConnectionImpl and const cast it so we + // can force watermark changes. + auto conn_impl = dynamic_cast(decoder_callbacks_->connection()); + return const_cast(conn_impl); + } + + absl::Mutex& encode_lock_; + uint32_t& number_of_encode_calls_ref_; + uint32_t& number_of_decode_calls_ref_; +}; + +class TestPauseFilterConfig : public Extensions::HttpFilters::Common::EmptyHttpFilterConfig { +public: + TestPauseFilterConfig() : EmptyHttpFilterConfig("pause-filter") {} + + Http::FilterFactoryCb createFilter(const std::string&, Server::Configuration::FactoryContext&) { + return [&](Http::FilterChainFactoryCallbacks& callbacks) -> void { + // GUARDED_BY insists the lock be held when the guarded variables are passed by reference. + absl::WriterMutexLock m(&encode_lock_); + callbacks.addStreamFilter(std::make_shared<::Envoy::TestPauseFilter>( + encode_lock_, number_of_encode_calls_, number_of_decode_calls_)); + }; + } + + absl::Mutex encode_lock_; + uint32_t number_of_encode_calls_ GUARDED_BY(encode_lock_) = 0; + uint32_t number_of_decode_calls_ GUARDED_BY(encode_lock_) = 0; +}; + +// perform static registration +static Registry::RegisterFactory + register_; + +void HttpIntegrationTest::testTwoRequests(bool network_backup) { + // if network_backup is false, this simply tests that Envoy can handle multiple + // requests on a connection. + // + // If network_backup is true, the first request will explicitly set the TCP level flow control + // as blocked as it finishes the encode and set a timer to unblock. The second stream should be + // created while the socket appears to be in the high watermark state, and regression tests that + // flow control will be corrected as the socket "becomes unblocked" + if (network_backup) { + config_helper_.addFilter(R"EOF( + name: pause-filter + config: {} + )EOF"); + } initialize(); codec_client_ = makeHttpConnection(lookupPort("http")); diff --git a/test/integration/http_integration.h b/test/integration/http_integration.h index 30e2eb905e835..0559715cc784c 100644 --- a/test/integration/http_integration.h +++ b/test/integration/http_integration.h @@ -134,7 +134,7 @@ class HttpIntegrationTest : public BaseIntegrationTest { void testRouterDownstreamDisconnectBeforeResponseComplete( ConnectionCreationFunction* creator = nullptr); void testRouterUpstreamResponseBeforeRequestComplete(); - void testTwoRequests(); + void testTwoRequests(bool force_network_backup = false); void testOverlyLongHeaders(); void testIdleTimeoutBasic(); void testIdleTimeoutWithTwoRequests(); diff --git a/test/integration/integration_test.cc b/test/integration/integration_test.cc index ed73f001a2538..fd1050cce4eab 100644 --- a/test/integration/integration_test.cc +++ b/test/integration/integration_test.cc @@ -131,6 +131,8 @@ TEST_P(IntegrationTest, EnvoyProxyingLate100ContinueWithEncoderFilter) { TEST_P(IntegrationTest, TwoRequests) { testTwoRequests(); } +TEST_P(IntegrationTest, TwoRequestsWithForcedBackup) { testTwoRequests(true); } + TEST_P(IntegrationTest, UpstreamDisconnectWithTwoRequests) { testUpstreamDisconnectWithTwoRequests(); } diff --git a/test/integration/pass_through_filter.h b/test/integration/pass_through_filter.h new file mode 100644 index 0000000000000..7e4c4fa3300cd --- /dev/null +++ b/test/integration/pass_through_filter.h @@ -0,0 +1,50 @@ +#pragma once + +#include "envoy/http/filter.h" + +namespace Envoy { + +// TODO(alyssawilk) move add_trailers_filter to use this. +// A filter which passes all data through with Continue status. +class PassThroughFilter : public Http::StreamFilter { +public: + // Http::StreamFilterBase + void onDestroy() override {} + + // Http::StreamDecoderFilter + Http::FilterHeadersStatus decodeHeaders(Http::HeaderMap&, bool) override { + return Http::FilterHeadersStatus::Continue; + } + Http::FilterDataStatus decodeData(Buffer::Instance&, bool) override { + return Http::FilterDataStatus::Continue; + } + + Http::FilterTrailersStatus decodeTrailers(Http::HeaderMap&) override { + return Http::FilterTrailersStatus::Continue; + } + void setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callbacks) override { + decoder_callbacks_ = &callbacks; + } + + // Http::StreamEncoderFilter + Http::FilterHeadersStatus encode100ContinueHeaders(Http::HeaderMap&) override { + return Http::FilterHeadersStatus::Continue; + } + Http::FilterHeadersStatus encodeHeaders(Http::HeaderMap&, bool) override { + return Http::FilterHeadersStatus::Continue; + } + Http::FilterDataStatus encodeData(Buffer::Instance&, bool) override { + return Http::FilterDataStatus::Continue; + } + Http::FilterTrailersStatus encodeTrailers(Http::HeaderMap&) override { + return Http::FilterTrailersStatus::Continue; + } + void setEncoderFilterCallbacks(Http::StreamEncoderFilterCallbacks& callbacks) override { + encoder_callbacks_ = &callbacks; + } + +protected: + Http::StreamDecoderFilterCallbacks* decoder_callbacks_{}; + Http::StreamEncoderFilterCallbacks* encoder_callbacks_{}; +}; +} // namespace Envoy