diff --git a/docs/root/intro/version_history.rst b/docs/root/intro/version_history.rst index 3b82cdf26b855..3b81f97a9a046 100644 --- a/docs/root/intro/version_history.rst +++ b/docs/root/intro/version_history.rst @@ -3,6 +3,7 @@ Version history 1.8.0 (Pending) =============== +* http: response filters not applied to early error paths such as http_parser generated 400s. * ratelimit: added support for :repo:`api/envoy/service/ratelimit/v2/rls.proto`. Lyft's reference implementation of the `ratelimit `_ service also supports the data-plane-api proto as of v1.1.0. Envoy can use either proto to send client requests to a ratelimit server with the use of the diff --git a/source/common/http/conn_manager_impl.cc b/source/common/http/conn_manager_impl.cc index 4ee287a161340..2d2809469697c 100644 --- a/source/common/http/conn_manager_impl.cc +++ b/source/common/http/conn_manager_impl.cc @@ -189,7 +189,6 @@ StreamDecoder& ConnectionManagerImpl::newStream(StreamEncoder& response_encoder) new_stream->response_encoder_ = &response_encoder; new_stream->response_encoder_->getStream().addCallbacks(*new_stream); new_stream->buffer_limit_ = new_stream->response_encoder_->getStream().bufferLimit(); - config_.filterFactory().createFilterChain(*new_stream); // Make sure new streams are apprised that the underlying connection is blocked. if (read_callbacks_->connection().aboveHighWatermark()) { new_stream->callHighWatermarkCallbacks(); @@ -447,8 +446,10 @@ const Network::Connection* ConnectionManagerImpl::ActiveStream::connection() { } void ConnectionManagerImpl::ActiveStream::decodeHeaders(HeaderMapPtr&& headers, bool end_stream) { - maybeEndDecode(end_stream); request_headers_ = std::move(headers); + createFilterChain(); + + maybeEndDecode(end_stream); ENVOY_STREAM_LOG(debug, "request headers complete (end_stream={}):\n{}", *this, end_stream, *request_headers_); @@ -1116,6 +1117,10 @@ void ConnectionManagerImpl::ActiveStream::setBufferLimit(uint32_t new_limit) { } } +void ConnectionManagerImpl::ActiveStream::createFilterChain() { + connection_manager_.config_.filterFactory().createFilterChain(*this); +} + void ConnectionManagerImpl::ActiveStreamFilterBase::commonContinue() { // TODO(mattklein123): Raise an error if this is called during a callback. if (!canContinue()) { diff --git a/source/common/http/conn_manager_impl.h b/source/common/http/conn_manager_impl.h index 9cefbfd254ef4..2bfa88ceeb295 100644 --- a/source/common/http/conn_manager_impl.h +++ b/source/common/http/conn_manager_impl.h @@ -360,6 +360,8 @@ class ConnectionManagerImpl : Logger::Loggable, // Possibly increases buffer_limit_ to the value of limit. void setBufferLimit(uint32_t limit); + // Set up the Encoder/Decoder filter chain. + void createFilterChain(); ConnectionManagerImpl& connection_manager_; Router::ConfigConstSharedPtr snapped_route_config_; diff --git a/test/common/http/conn_manager_impl_test.cc b/test/common/http/conn_manager_impl_test.cc index c218c9cea7e49..636187c55607a 100644 --- a/test/common/http/conn_manager_impl_test.cc +++ b/test/common/http/conn_manager_impl_test.cc @@ -1657,14 +1657,13 @@ TEST_F(HttpConnectionManagerImplTest, DownstreamDisconnect) { data.drain(2); })); - setupFilterChain(1, 0); + EXPECT_CALL(filter_factory_, createFilterChain(_)).Times(0); // Kick off the incoming data. Buffer::OwnedImpl fake_input("1234"); conn_manager_->onData(fake_input, false); // Now raise a remote disconnection, we should see the filter get reset called. - EXPECT_CALL(*decoder_filters_[0], onDestroy()); conn_manager_->onEvent(Network::ConnectionEvent::RemoteClose); } @@ -1677,10 +1676,9 @@ TEST_F(HttpConnectionManagerImplTest, DownstreamProtocolError) { throw CodecProtocolException("protocol error"); })); - setupFilterChain(1, 0); + EXPECT_CALL(filter_factory_, createFilterChain(_)).Times(0); // A protocol exception should result in reset of the streams followed by a local close. - EXPECT_CALL(*decoder_filters_[0], onDestroy()); EXPECT_CALL(filter_callbacks_.connection_, close(Network::ConnectionCloseType::FlushWrite)); // Kick off the incoming data. @@ -2225,6 +2223,120 @@ TEST_F(HttpConnectionManagerImplTest, UnderlyingConnectionWatermarksPassedOn) { decoder_filters_[0]->callbacks_->addDownstreamWatermarkCallbacks(callbacks); } +TEST_F(HttpConnectionManagerImplTest, UnderlyingConnectionWatermarksPassedOnWithLazyCreation) { + setup(false, ""); + + // Make sure codec_ is created. + EXPECT_CALL(*codec_, dispatch(_)); + Buffer::OwnedImpl fake_input(""); + conn_manager_->onData(fake_input, false); + + // Mark the connection manger as backed up before the stream is created. + ASSERT_EQ(decoder_filters_.size(), 0); + EXPECT_CALL(*codec_, onUnderlyingConnectionAboveWriteBufferHighWatermark()); + conn_manager_->onAboveWriteBufferHighWatermark(); + + // Create the stream. Defer the creation of the filter chain by not sending + // complete headers. + StreamDecoder* decoder; + { + setUpBufferLimits(); + EXPECT_CALL(*codec_, dispatch(_)).WillOnce(Invoke([&](Buffer::Instance&) -> void { + decoder = &conn_manager_->newStream(response_encoder_); + })); + + // Verify the high watermark is passed on. + EXPECT_CALL(filter_callbacks_.connection_, aboveHighWatermark()).WillOnce(Return(true)); + + // Send fake data to kick off newStream being created. + Buffer::OwnedImpl fake_input2("asdf"); + conn_manager_->onData(fake_input2, false); + } + + // Now set up the filter chain by sending full headers. The filters should be + // immediately appraised that the low watermark is in effect. + { + setupFilterChain(2, 2); + EXPECT_CALL(filter_callbacks_.connection_, aboveHighWatermark()).Times(0); + EXPECT_CALL(*codec_, dispatch(_)).WillOnce(Invoke([&](Buffer::Instance&) -> void { + HeaderMapPtr headers{new TestHeaderMapImpl{{":authority", "host"}, {":path", "/"}}}; + decoder->decodeHeaders(std::move(headers), true); + })); + EXPECT_CALL(*decoder_filters_[0], decodeHeaders(_, true)) + .WillOnce(InvokeWithoutArgs([&]() -> FilterHeadersStatus { + Buffer::OwnedImpl data("hello"); + decoder_filters_[0]->callbacks_->addDecodedData(data, true); + return FilterHeadersStatus::Continue; + })); + sendReqestHeadersAndData(); + ASSERT_GE(decoder_filters_.size(), 1); + MockDownstreamWatermarkCallbacks callbacks; + EXPECT_CALL(callbacks, onAboveWriteBufferHighWatermark()); + decoder_filters_[0]->callbacks_->addDownstreamWatermarkCallbacks(callbacks); + } +} + +TEST_F(HttpConnectionManagerImplTest, UnderlyingConnectionWatermarksUnwoundWithLazyCreation) { + setup(false, ""); + + // Make sure codec_ is created. + EXPECT_CALL(*codec_, dispatch(_)); + Buffer::OwnedImpl fake_input(""); + conn_manager_->onData(fake_input, false); + + // Mark the connection manger as backed up before the stream is created. + ASSERT_EQ(decoder_filters_.size(), 0); + EXPECT_CALL(*codec_, onUnderlyingConnectionAboveWriteBufferHighWatermark()); + conn_manager_->onAboveWriteBufferHighWatermark(); + + // Create the stream. Defer the creation of the filter chain by not sending + // complete headers. + StreamDecoder* decoder; + { + setUpBufferLimits(); + EXPECT_CALL(*codec_, dispatch(_)).WillOnce(Invoke([&](Buffer::Instance&) -> void { + decoder = &conn_manager_->newStream(response_encoder_); + })); + + // Verify the high watermark is passed on. + EXPECT_CALL(filter_callbacks_.connection_, aboveHighWatermark()).WillOnce(Return(true)); + + // Send fake data to kick off newStream being created. + Buffer::OwnedImpl fake_input2("asdf"); + conn_manager_->onData(fake_input2, false); + } + + // Now before the filter chain is created, fire the low watermark callbacks + // and ensure it is passed down to the stream. + ASSERT(stream_callbacks_ != nullptr); + EXPECT_CALL(*codec_, onUnderlyingConnectionBelowWriteBufferLowWatermark()) + .WillOnce(Invoke([&]() -> void { stream_callbacks_->onBelowWriteBufferLowWatermark(); })); + conn_manager_->onBelowWriteBufferLowWatermark(); + + // Now set up the filter chain by sending full headers. The filters should + // not get any watermark callbacks. + { + setupFilterChain(2, 2); + EXPECT_CALL(filter_callbacks_.connection_, aboveHighWatermark()).Times(0); + EXPECT_CALL(*codec_, dispatch(_)).WillOnce(Invoke([&](Buffer::Instance&) -> void { + HeaderMapPtr headers{new TestHeaderMapImpl{{":authority", "host"}, {":path", "/"}}}; + decoder->decodeHeaders(std::move(headers), true); + })); + EXPECT_CALL(*decoder_filters_[0], decodeHeaders(_, true)) + .WillOnce(InvokeWithoutArgs([&]() -> FilterHeadersStatus { + Buffer::OwnedImpl data("hello"); + decoder_filters_[0]->callbacks_->addDecodedData(data, true); + return FilterHeadersStatus::Continue; + })); + sendReqestHeadersAndData(); + ASSERT_GE(decoder_filters_.size(), 1); + MockDownstreamWatermarkCallbacks callbacks; + EXPECT_CALL(callbacks, onAboveWriteBufferHighWatermark()).Times(0); + EXPECT_CALL(callbacks, onBelowWriteBufferLowWatermark()).Times(0); + decoder_filters_[0]->callbacks_->addDownstreamWatermarkCallbacks(callbacks); + } +} + TEST_F(HttpConnectionManagerImplTest, AlterFilterWatermarkLimits) { initial_buffer_limit_ = 100; setup(false, "");