diff --git a/docs/configuration/cluster_manager/cluster_stats.rst b/docs/configuration/cluster_manager/cluster_stats.rst index ebd6ab084f51b..01bffed7f73d3 100644 --- a/docs/configuration/cluster_manager/cluster_stats.rst +++ b/docs/configuration/cluster_manager/cluster_stats.rst @@ -60,6 +60,7 @@ Every cluster has a statistics tree rooted at *cluster..* with the followi membership_change, Counter, Total cluster membership changes membership_healthy, Gauge, Current cluster healthy total (inclusive of both health checking and outlier detection) membership_total, Gauge, Current cluster membership total + retry_or_shadow_abandoned, Counter, Total number of times shadowing or retry buffering was canceled due to buffer limits. update_attempt, Counter, Total cluster membership update attempts update_success, Counter, Total cluster membership update successes update_failure, Counter, Total cluster membership update failures diff --git a/docs/configuration/http_conn_man/stats.rst b/docs/configuration/http_conn_man/stats.rst index 26fc9ca7d585b..c7be8974c1a6e 100644 --- a/docs/configuration/http_conn_man/stats.rst +++ b/docs/configuration/http_conn_man/stats.rst @@ -44,6 +44,7 @@ statistics: downstream_rq_rx_reset, Counter, Total request resets received downstream_rq_tx_reset, Counter, Total request resets sent downstream_rq_non_relative_path, Counter, Total requests with a non-relative HTTP path + downstream_rq_too_large, Counter, Total requests resulting in a 413 due to buffering an overly large body. downstream_rq_2xx, Counter, Total 2xx responses downstream_rq_3xx, Counter, Total 3xx responses downstream_rq_4xx, Counter, Total 4xx responses @@ -51,6 +52,7 @@ statistics: downstream_rq_ws_on_non_ws_route, Counter, Total WebSocket upgrade requests rejected by non WebSocket routes downstream_rq_non_ws_on_ws_route, Counter, Total HTTP requests rejected by WebSocket enabled routes due to missing upgrade header downstream_rq_time, Timer, Request time milliseconds + rs_too_large, Counter, Total response errors due to buffering an overly large body. Per user agent statistics ------------------------- diff --git a/docs/configuration/http_filters/buffer_filter.rst b/docs/configuration/http_filters/buffer_filter.rst index 1b8fe333e3ee9..81d19a0736d83 100644 --- a/docs/configuration/http_filters/buffer_filter.rst +++ b/docs/configuration/http_filters/buffer_filter.rst @@ -19,8 +19,8 @@ with partial requests and high network latency. } max_request_bytes - *(required, integer)* The maximum request size that the filter will before before it stops - buffering and returns a 413 response. + *(required, integer)* The maximum request size that the filter will before the connection manager + will stop buffering and return a 413 response. max_request_time_s *(required, integer)* The maximum amount of time that the filter will wait for a complete request @@ -37,4 +37,3 @@ prefix ` comes from the owning HTTP connection :widths: 1, 1, 2 rq_timeout, Counter, Total requests that timed out waiting for a full request - rq_too_large, Counter, Total requests that failed due to being too large diff --git a/include/envoy/http/codec.h b/include/envoy/http/codec.h index 5f4935e4f434c..b1de0741074e2 100644 --- a/include/envoy/http/codec.h +++ b/include/envoy/http/codec.h @@ -151,6 +151,13 @@ class Stream { * @param disable informs if reads should be disabled (true) or re-enabled (false). */ virtual void readDisable(bool disable) PURE; + + /* + * Return the number of bytes this stream is allowed to buffer, or 0 if there is no limit + * configured. + * @return uint32_t the stream's configured buffer limits. + */ + virtual uint32_t bufferLimit() PURE; }; /** diff --git a/include/envoy/http/filter.h b/include/envoy/http/filter.h index b78e4e8a30ef6..313f192772ba2 100644 --- a/include/envoy/http/filter.h +++ b/include/envoy/http/filter.h @@ -42,7 +42,24 @@ enum class FilterDataStatus { // Do not iterate to any of the remaining filters in the chain, and buffer body data for later // dispatching. Returning FilterDataStatus::Continue from decodeData()/encodeData() or calling // continueDecoding()/continueEncoding() MUST be called if continued filter iteration is desired. + // + // This should be called by filters which must parse a larger block of the incoming data before + // continuing processing and so can not push back on streaming data via watermarks. + // + // If buffering the request causes buffered data to exceed the configured buffer limit, a 413 will + // be sent to the user. On the response path exceeding buffer limits will result in a 500. StopIterationAndBuffer, + // Do not iterate to any of the remaining filters in the chain, and buffer body data for later + // dispatching. Returning FilterDataStatus::Continue from decodeData()/encodeData() or calling + // continueDecoding()/continueEncoding() MUST be called if continued filter iteration is desired. + // + // This will cause the flow of incoming data to cease until one of the continue.*() functions is + // called. + // + // This should be returned by filters which can nominally stream data but have a transient back-up + // such as the configured delay of the fault filter, or if the router filter is still fetching an + // upstream connection. + StopIterationAndWatermark, // Do not iterate to any of the remaining filters in the chain, but do not buffer any of the // body data for later dispatching. Returning FilterDataStatus::Continue from // decodeData()/encodeData() or calling continueDecoding()/continueEncoding() MUST be called if @@ -163,8 +180,11 @@ class StreamDecoderFilterCallbacks : public virtual StreamFilterCallbacks { * followed by decodeTrailers(). * * It is an error to call this method in any other case. + * + * @param data Buffer::Instance supplies the data to be decoded. + * @param streaming_filter boolean supplies if this filter streams data or buffers the full body. */ - virtual void addDecodedData(Buffer::Instance& data) PURE; + virtual void addDecodedData(Buffer::Instance& data, bool streaming_filter) PURE; /** * Called with headers to be encoded, optionally indicating end of stream. @@ -222,6 +242,22 @@ class StreamDecoderFilterCallbacks : public virtual StreamFilterCallbacks { * It is not safe to call this from under the stack of a DownstreamWatermarkCallbacks callback. */ virtual void removeDownstreamWatermarkCallbacks(DownstreamWatermarkCallbacks& callbacks) PURE; + + /** + * This routine may be called to change the buffer limit for decoder filters. + * + * @param boolean supplies the desired buffer limit. + */ + virtual void setDecoderBufferLimit(uint32_t limit) PURE; + + /** + * This routine returns the current buffer limit for decoder filters. Filters should abide by + * this limit or change it via setDecoderBufferLimit. + * A buffer limit of 0 bytes indicates no limits are applied. + * + * @return the buffer limit the filter should apply. + */ + virtual uint32_t decoderBufferLimit() PURE; }; /** @@ -319,8 +355,11 @@ class StreamEncoderFilterCallbacks : public virtual StreamFilterCallbacks { * followed by encodeTrailers(). * * It is an error to call this method in any other case. + * + * @param data Buffer::Instance supplies the data to be encoded. + * @param streaming_filter boolean supplies if this filter streams data or buffers the full body. */ - virtual void addEncodedData(Buffer::Instance& data) PURE; + virtual void addEncodedData(Buffer::Instance& data, bool streaming_filter) PURE; /** * Called when an encoder filter goes over its high watermark. @@ -331,6 +370,22 @@ class StreamEncoderFilterCallbacks : public virtual StreamFilterCallbacks { * Called when a encoder filter goes from over its high watermark to under its low watermark. */ virtual void onEncoderFilterBelowWriteBufferLowWatermark() PURE; + + /** + * This routine may be called to change the buffer limit for encoder filters. + * + * @limit settings supplies the desired buffer limit. + */ + virtual void setEncoderBufferLimit(uint32_t limit) PURE; + + /** + * This routine returns the current buffer limit for encoder filters. Filters should abide by + * this limit or change it via setEncoderBufferLimit. + * A buffer limit of 0 bytes indicates no limits are applied. + * + * @return the buffer limit the filter should apply. + */ + virtual uint32_t encoderBufferLimit() PURE; }; /** diff --git a/include/envoy/upstream/upstream.h b/include/envoy/upstream/upstream.h index d067ea00ae507..08b476a5dd02f 100644 --- a/include/envoy/upstream/upstream.h +++ b/include/envoy/upstream/upstream.h @@ -231,6 +231,7 @@ class HostSet { COUNTER(membership_change) \ GAUGE (membership_healthy) \ GAUGE (membership_total) \ + COUNTER(retry_or_shadow_abandoned) \ COUNTER(update_attempt) \ COUNTER(update_success) \ COUNTER(update_failure) diff --git a/source/common/buffer/watermark_buffer.cc b/source/common/buffer/watermark_buffer.cc index a7f4648a49734..9eb32b1815ee7 100644 --- a/source/common/buffer/watermark_buffer.cc +++ b/source/common/buffer/watermark_buffer.cc @@ -59,7 +59,7 @@ int WatermarkBuffer::write(int fd) { } void WatermarkBuffer::setWatermarks(uint32_t low_watermark, uint32_t high_watermark) { - ASSERT(low_watermark < high_watermark); + ASSERT(low_watermark < high_watermark || (high_watermark == 0 && low_watermark == 0)); low_watermark_ = low_watermark; high_watermark_ = high_watermark; checkHighWatermark(); @@ -67,7 +67,8 @@ void WatermarkBuffer::setWatermarks(uint32_t low_watermark, uint32_t high_waterm } void WatermarkBuffer::checkLowWatermark() { - if (!above_high_watermark_called_ || OwnedImpl::length() >= low_watermark_) { + if (!above_high_watermark_called_ || + (high_watermark_ != 0 && OwnedImpl::length() >= low_watermark_)) { return; } diff --git a/source/common/buffer/watermark_buffer.h b/source/common/buffer/watermark_buffer.h index 9f4480fd12517..1bdc557880f1f 100644 --- a/source/common/buffer/watermark_buffer.h +++ b/source/common/buffer/watermark_buffer.h @@ -33,12 +33,9 @@ class WatermarkBuffer : public OwnedImpl { int write(int fd) override; void postProcess() override { checkLowWatermark(); } - void setWatermarks(uint32_t watermark) { - if (watermark != 0) { - setWatermarks(watermark / 2, watermark); - } - } + void setWatermarks(uint32_t watermark) { setWatermarks(watermark / 2, watermark); } void setWatermarks(uint32_t low_watermark, uint32_t high_watermark); + uint32_t highWatermark() const { return high_watermark_; } private: void checkHighWatermark(); @@ -57,6 +54,8 @@ class WatermarkBuffer : public OwnedImpl { bool above_high_watermark_called_{false}; }; +typedef std::unique_ptr WatermarkBufferPtr; + class WatermarkBufferFactory : public WatermarkFactory { public: // Buffer::WatermarkFactory diff --git a/source/common/dynamo/dynamo_filter.cc b/source/common/dynamo/dynamo_filter.cc index e93373c1a51ef..557a02f9b3623 100644 --- a/source/common/dynamo/dynamo_filter.cc +++ b/source/common/dynamo/dynamo_filter.cc @@ -36,6 +36,7 @@ Http::FilterDataStatus DynamoFilter::decodeData(Buffer::Instance& data, bool end if (end_stream) { return Http::FilterDataStatus::Continue; } else { + // Buffer until the complete request has been processed. return Http::FilterDataStatus::StopIterationAndBuffer; } } @@ -110,6 +111,7 @@ Http::FilterDataStatus DynamoFilter::encodeData(Buffer::Instance& data, bool end if (end_stream) { return Http::FilterDataStatus::Continue; } else { + // Buffer until the complete response has been processed. return Http::FilterDataStatus::StopIterationAndBuffer; } } diff --git a/source/common/grpc/grpc_web_filter.cc b/source/common/grpc/grpc_web_filter.cc index 3493725934697..da7968da34dfa 100644 --- a/source/common/grpc/grpc_web_filter.cc +++ b/source/common/grpc/grpc_web_filter.cc @@ -2,6 +2,7 @@ #include +#include "common/common/assert.h" #include "common/common/base64.h" #include "common/common/empty_string.h" #include "common/common/utility.h" @@ -103,6 +104,8 @@ Http::FilterDataStatus GrpcWebFilter::decodeData(Buffer::Instance& data, bool) { decoding_buffer_.drain(decoding_buffer_.length()); decoding_buffer_.move(data); data.add(decoded); + // Any block of 4 bytes or more should have been decoded and passed through. + ASSERT(decoding_buffer_.length() < 4); return Http::FilterDataStatus::Continue; } @@ -189,9 +192,9 @@ Http::FilterTrailersStatus GrpcWebFilter::encodeTrailers(Http::HeaderMap& traile buffer.move(temp); if (is_text_response_) { Buffer::OwnedImpl encoded(Base64::encode(buffer, buffer.length())); - encoder_callbacks_->addEncodedData(encoded); + encoder_callbacks_->addEncodedData(encoded, true); } else { - encoder_callbacks_->addEncodedData(buffer); + encoder_callbacks_->addEncodedData(buffer, true); } return Http::FilterTrailersStatus::Continue; } diff --git a/source/common/grpc/http1_bridge_filter.cc b/source/common/grpc/http1_bridge_filter.cc index b98b3b83a5673..409928faaa59f 100644 --- a/source/common/grpc/http1_bridge_filter.cc +++ b/source/common/grpc/http1_bridge_filter.cc @@ -52,6 +52,7 @@ Http::FilterDataStatus Http1BridgeFilter::encodeData(Buffer::Instance&, bool end if (!do_bridging_ || end_stream) { return Http::FilterDataStatus::Continue; } else { + // Buffer until the complete request has been processed. return Http::FilterDataStatus::StopIterationAndBuffer; } } diff --git a/source/common/grpc/json_transcoder_filter.cc b/source/common/grpc/json_transcoder_filter.cc index d230d7f54fd88..ae78d9c9bdeba 100644 --- a/source/common/grpc/json_transcoder_filter.cc +++ b/source/common/grpc/json_transcoder_filter.cc @@ -226,7 +226,7 @@ Http::FilterHeadersStatus JsonTranscoderFilter::decodeHeaders(Http::HeaderMap& h readToBuffer(*transcoder_->RequestOutput(), data); if (data.length() > 0) { - decoder_callbacks_->addDecodedData(data); + decoder_callbacks_->addDecodedData(data, true); } } return Http::FilterHeadersStatus::Continue; @@ -273,7 +273,7 @@ Http::FilterTrailersStatus JsonTranscoderFilter::decodeTrailers(Http::HeaderMap& readToBuffer(*transcoder_->RequestOutput(), data); if (data.length()) { - decoder_callbacks_->addDecodedData(data); + decoder_callbacks_->addDecodedData(data, true); } return Http::FilterTrailersStatus::Continue; } @@ -311,6 +311,7 @@ Http::FilterDataStatus JsonTranscoderFilter::encodeData(Buffer::Instance& data, readToBuffer(*transcoder_->ResponseOutput(), data); if (!method_->server_streaming()) { + // Buffer until the response is complete. return Http::FilterDataStatus::StopIterationAndBuffer; } // TODO(lizan): Check ResponseStatus @@ -329,7 +330,7 @@ Http::FilterTrailersStatus JsonTranscoderFilter::encodeTrailers(Http::HeaderMap& readToBuffer(*transcoder_->ResponseOutput(), data); if (data.length()) { - encoder_callbacks_->addEncodedData(data); + encoder_callbacks_->addEncodedData(data, true); } if (method_->server_streaming()) { diff --git a/source/common/http/async_client_impl.h b/source/common/http/async_client_impl.h index dbe1234101813..36999e7a1382f 100644 --- a/source/common/http/async_client_impl.h +++ b/source/common/http/async_client_impl.h @@ -190,7 +190,7 @@ class AsyncStreamImpl : public AsyncClient::Stream, Tracing::Span& activeSpan() override { return active_span_; } const std::string& downstreamAddress() override { return EMPTY_STRING; } void continueDecoding() override { NOT_IMPLEMENTED; } - void addDecodedData(Buffer::Instance&) override { NOT_IMPLEMENTED; } + void addDecodedData(Buffer::Instance&, bool) override { NOT_IMPLEMENTED; } const Buffer::Instance* decodingBuffer() override { throw EnvoyException("buffering is not supported in streaming"); } @@ -201,6 +201,8 @@ class AsyncStreamImpl : public AsyncClient::Stream, void onDecoderFilterBelowWriteBufferLowWatermark() override {} void addDownstreamWatermarkCallbacks(DownstreamWatermarkCallbacks&) override {} void removeDownstreamWatermarkCallbacks(DownstreamWatermarkCallbacks&) override {} + void setDecoderBufferLimit(uint32_t) override {} + uint32_t decoderBufferLimit() override { return 0; } AsyncClient::StreamCallbacks& stream_callbacks_; const uint64_t stream_id_; diff --git a/source/common/http/conn_manager_impl.cc b/source/common/http/conn_manager_impl.cc index 6e94ed743c9e3..b4d057da03ac7 100644 --- a/source/common/http/conn_manager_impl.cc +++ b/source/common/http/conn_manager_impl.cc @@ -152,6 +152,7 @@ void ConnectionManagerImpl::doEndStream(ActiveStream& stream) { } void ConnectionManagerImpl::doDeferredStreamDestroy(ActiveStream& stream) { + stream.state_.destroyed_ = true; for (auto& filter : stream.decoder_filters_) { filter->handle_->onDestroy(); } @@ -175,6 +176,7 @@ StreamDecoder& ConnectionManagerImpl::newStream(StreamEncoder& response_encoder) ActiveStreamPtr new_stream(new ActiveStream(*this)); new_stream->response_encoder_ = &response_encoder; new_stream->response_encoder_->getStream().addCallbacks(*new_stream); + new_stream->buffer_limit_ = new_stream->response_encoder_->getStream().bufferLimit(); config_.filterFactory().createFilterChain(*new_stream); // Make sure new streams are apprised that the underlying connection is blocked. if (read_callbacks_->connection().aboveHighWatermark()) { @@ -625,16 +627,18 @@ void ConnectionManagerImpl::ActiveStream::decodeData(ActiveStreamDecoderFilter* state_.filter_call_state_ &= ~FilterCallState::DecodeData; ENVOY_STREAM_LOG(trace, "decode data called: filter={} status={}", *this, static_cast((*entry).get()), static_cast(status)); - if (!(*entry)->commonHandleAfterDataCallback(status, data)) { + if (!(*entry)->commonHandleAfterDataCallback(status, data, state_.decoder_filters_streaming_)) { return; } } } void ConnectionManagerImpl::ActiveStream::addDecodedData(ActiveStreamDecoderFilter& filter, - Buffer::Instance& data) { + Buffer::Instance& data, bool streaming) { if (state_.filter_call_state_ == 0 || (state_.filter_call_state_ & FilterCallState::DecodeHeaders)) { + // Make sure if this triggers watermarks, the correct action is taken. + state_.decoder_filters_streaming_ = streaming; // If no call is happening or we are in the decode headers callback, buffer the data. Inline // processing happens in the decodeHeaders() callback if necessary. filter.commonHandleBufferData(data); @@ -803,9 +807,11 @@ void ConnectionManagerImpl::ActiveStream::encodeHeaders(ActiveStreamEncoderFilte } void ConnectionManagerImpl::ActiveStream::addEncodedData(ActiveStreamEncoderFilter& filter, - Buffer::Instance& data) { + Buffer::Instance& data, bool streaming) { if (state_.filter_call_state_ == 0 || (state_.filter_call_state_ & FilterCallState::EncodeHeaders)) { + // Make sure if this triggers watermarks, the correct action is taken. + state_.encoder_filters_streaming_ = streaming; // If no call is happening or we are in the decode headers callback, buffer the data. Inline // processing happens in the decodeHeaders() callback if necessary. filter.commonHandleBufferData(data); @@ -830,7 +836,7 @@ void ConnectionManagerImpl::ActiveStream::encodeData(ActiveStreamEncoderFilter* state_.filter_call_state_ &= ~FilterCallState::EncodeData; ENVOY_STREAM_LOG(trace, "encode data called: filter={} status={}", *this, static_cast((*entry).get()), static_cast(status)); - if (!(*entry)->commonHandleAfterDataCallback(status, data)) { + if (!(*entry)->commonHandleAfterDataCallback(status, data, state_.encoder_filters_streaming_)) { return; } } @@ -923,6 +929,16 @@ void ConnectionManagerImpl::ActiveStream::callLowWatermarkCallbacks() { } } +void ConnectionManagerImpl::ActiveStream::setBufferLimit(uint32_t new_limit) { + buffer_limit_ = new_limit; + if (buffered_request_data_) { + buffered_request_data_->setWatermarks(buffer_limit_); + } + if (buffered_response_data_) { + buffered_response_data_->setWatermarks(buffer_limit_); + } +} + void ConnectionManagerImpl::ActiveStreamFilterBase::commonContinue() { // TODO(mattklein123): Raise an error if this is called during a callback. ENVOY_STREAM_LOG(trace, "continuing filter chain: filter={}", parent_, @@ -976,14 +992,14 @@ void ConnectionManagerImpl::ActiveStreamFilterBase::commonHandleBufferData( // rebuffer, because we assume the filter has modified the buffer as it wishes in place. if (bufferedData().get() != &provided_data) { if (!bufferedData()) { - bufferedData().reset(new Buffer::OwnedImpl()); + bufferedData() = createBuffer(); } bufferedData()->move(provided_data); } } bool ConnectionManagerImpl::ActiveStreamFilterBase::commonHandleAfterDataCallback( - FilterDataStatus status, Buffer::Instance& provided_data) { + FilterDataStatus status, Buffer::Instance& provided_data, bool& buffer_was_streaming) { if (status == FilterDataStatus::Continue) { if (stopped_) { @@ -995,7 +1011,9 @@ bool ConnectionManagerImpl::ActiveStreamFilterBase::commonHandleAfterDataCallbac } } else { stopped_ = true; - if (status == FilterDataStatus::StopIterationAndBuffer) { + if (status == FilterDataStatus::StopIterationAndBuffer || + status == FilterDataStatus::StopIterationAndWatermark) { + buffer_was_streaming = status == FilterDataStatus::StopIterationAndWatermark; commonHandleBufferData(provided_data); } @@ -1051,8 +1069,17 @@ void ConnectionManagerImpl::ActiveStreamFilterBase::clearRouteCache() { parent_.cached_route_ = Optional(); } -void ConnectionManagerImpl::ActiveStreamDecoderFilter::addDecodedData(Buffer::Instance& data) { - parent_.addDecodedData(*this, data); +Buffer::WatermarkBufferPtr ConnectionManagerImpl::ActiveStreamDecoderFilter::createBuffer() { + auto buffer = Buffer::WatermarkBufferPtr{ + new Buffer::WatermarkBuffer([this]() -> void { this->requestDataDrained(); }, + [this]() -> void { this->requestDataTooLarge(); })}; + buffer->setWatermarks(parent_.buffer_limit_); + return buffer; +} + +void ConnectionManagerImpl::ActiveStreamDecoderFilter::addDecodedData(Buffer::Instance& data, + bool streaming) { + parent_.addDecodedData(*this, data, streaming); } void ConnectionManagerImpl::ActiveStreamDecoderFilter::continueDecoding() { commonContinue(); } @@ -1080,6 +1107,22 @@ void ConnectionManagerImpl::ActiveStreamDecoderFilter:: parent_.connection_manager_.stats_.named_.downstream_flow_control_paused_reading_total_.inc(); } +void ConnectionManagerImpl::ActiveStreamDecoderFilter::requestDataTooLarge() { + if (parent_.state_.decoder_filters_streaming_) { + onDecoderFilterAboveWriteBufferHighWatermark(); + } else { + parent_.connection_manager_.stats_.named_.downstream_rq_too_large_.inc(); + Http::Utility::sendLocalReply(*this, parent_.state_.destroyed_, Http::Code::PayloadTooLarge, + CodeUtility::toString(Http::Code::PayloadTooLarge)); + } +} + +void ConnectionManagerImpl::ActiveStreamDecoderFilter::requestDataDrained() { + // If this is called it means the call to requestDataTooLarge() was a + // streaming call, or a 413 would have been sent. + onDecoderFilterBelowWriteBufferLowWatermark(); +} + void ConnectionManagerImpl::ActiveStreamDecoderFilter:: onDecoderFilterBelowWriteBufferLowWatermark() { ENVOY_STREAM_LOG(debug, "Read-enabling downstream stream due to filter callbacks.", parent_); @@ -1105,8 +1148,16 @@ void ConnectionManagerImpl::ActiveStreamDecoderFilter::removeDownstreamWatermark parent_.watermark_callbacks_ = nullptr; } -void ConnectionManagerImpl::ActiveStreamEncoderFilter::addEncodedData(Buffer::Instance& data) { - return parent_.addEncodedData(*this, data); +Buffer::WatermarkBufferPtr ConnectionManagerImpl::ActiveStreamEncoderFilter::createBuffer() { + auto buffer = new Buffer::WatermarkBuffer([this]() -> void { this->responseDataDrained(); }, + [this]() -> void { this->responseDataTooLarge(); }); + buffer->setWatermarks(parent_.buffer_limit_); + return Buffer::WatermarkBufferPtr{buffer}; +} + +void ConnectionManagerImpl::ActiveStreamEncoderFilter::addEncodedData(Buffer::Instance& data, + bool streaming) { + return parent_.addEncodedData(*this, data, streaming); } void ConnectionManagerImpl::ActiveStreamEncoderFilter:: @@ -1123,6 +1174,39 @@ void ConnectionManagerImpl::ActiveStreamEncoderFilter:: void ConnectionManagerImpl::ActiveStreamEncoderFilter::continueEncoding() { commonContinue(); } +void ConnectionManagerImpl::ActiveStreamEncoderFilter::responseDataTooLarge() { + if (parent_.state_.encoder_filters_streaming_) { + onEncoderFilterAboveWriteBufferHighWatermark(); + } else { + // If headers have not been sent to the user, send a 500. + if (!headers_continued_) { + // Make sure we won't end up with nested watermark calls from the body buffer. + parent_.state_.encoder_filters_streaming_ = true; + stopped_ = false; + + parent_.connection_manager_.stats_.named_.rs_too_large_.inc(); + Http::Utility::sendLocalReply( + [&](HeaderMapPtr&& response_headers, bool end_stream) -> void { + parent_.response_headers_ = std::move(response_headers); + parent_.response_encoder_->encodeHeaders(*parent_.response_headers_, end_stream); + }, + [&](Buffer::Instance& data, bool end_stream) -> void { + parent_.response_encoder_->encodeData(data, end_stream); + parent_.state_.local_complete_ = end_stream; + parent_.maybeEndEncode(end_stream); + }, + parent_.state_.destroyed_, Http::Code::InternalServerError, + CodeUtility::toString(Http::Code::InternalServerError)); + } else { + resetStream(); + } + } +} + +void ConnectionManagerImpl::ActiveStreamEncoderFilter::responseDataDrained() { + onEncoderFilterBelowWriteBufferLowWatermark(); +} + void ConnectionManagerImpl::ActiveStreamFilterBase::resetStream() { parent_.connection_manager_.stats_.named_.downstream_rq_tx_reset_.inc(); parent_.connection_manager_.doEndStream(this->parent_); diff --git a/source/common/http/conn_manager_impl.h b/source/common/http/conn_manager_impl.h index 980d045b4fc58..8f23dc76285f2 100644 --- a/source/common/http/conn_manager_impl.h +++ b/source/common/http/conn_manager_impl.h @@ -23,6 +23,7 @@ #include "envoy/tracing/http_tracer.h" #include "envoy/upstream/upstream.h" +#include "common/buffer/watermark_buffer.h" #include "common/common/linked_object.h" #include "common/http/access_log/request_info_impl.h" #include "common/http/date_provider.h" @@ -74,11 +75,13 @@ namespace Http { COUNTER(downstream_rq_non_relative_path) \ COUNTER(downstream_rq_ws_on_non_ws_route) \ COUNTER(downstream_rq_non_ws_on_ws_route) \ + COUNTER(downstream_rq_too_large) \ COUNTER(downstream_rq_2xx) \ COUNTER(downstream_rq_3xx) \ COUNTER(downstream_rq_4xx) \ COUNTER(downstream_rq_5xx) \ - TIMER (downstream_rq_time) + TIMER (downstream_rq_time) \ + COUNTER(rs_too_large) // clang-format on /** @@ -307,11 +310,13 @@ class ConnectionManagerImpl : Logger::Loggable, bool commonHandleAfterHeadersCallback(FilterHeadersStatus status); void commonHandleBufferData(Buffer::Instance& provided_data); - bool commonHandleAfterDataCallback(FilterDataStatus status, Buffer::Instance& provided_data); + bool commonHandleAfterDataCallback(FilterDataStatus status, Buffer::Instance& provided_data, + bool& buffer_was_streaming); bool commonHandleAfterTrailersCallback(FilterTrailersStatus status); void commonContinue(); - virtual Buffer::InstancePtr& bufferedData() PURE; + virtual Buffer::WatermarkBufferPtr createBuffer() PURE; + virtual Buffer::WatermarkBufferPtr& bufferedData() PURE; virtual bool complete() PURE; virtual void doHeaders(bool end_stream) PURE; virtual void doData(bool end_stream) PURE; @@ -346,7 +351,8 @@ class ConnectionManagerImpl : Logger::Loggable, : ActiveStreamFilterBase(parent, dual_filter), handle_(filter) {} // ActiveStreamFilterBase - Buffer::InstancePtr& bufferedData() override { return parent_.buffered_request_data_; } + Buffer::WatermarkBufferPtr createBuffer() override; + Buffer::WatermarkBufferPtr& bufferedData() override { return parent_.buffered_request_data_; } bool complete() override { return parent_.state_.remote_complete_; } void doHeaders(bool end_stream) override { parent_.decodeHeaders(this, *parent_.request_headers_, end_stream); @@ -358,7 +364,7 @@ class ConnectionManagerImpl : Logger::Loggable, const HeaderMapPtr& trailers() override { return parent_.request_trailers_; } // Http::StreamDecoderFilterCallbacks - void addDecodedData(Buffer::Instance& data) override; + void addDecodedData(Buffer::Instance& data, bool streaming) override; void continueDecoding() override; const Buffer::Instance* decodingBuffer() override { return parent_.buffered_request_data_.get(); @@ -372,6 +378,11 @@ class ConnectionManagerImpl : Logger::Loggable, addDownstreamWatermarkCallbacks(DownstreamWatermarkCallbacks& watermark_callbacks) override; void removeDownstreamWatermarkCallbacks(DownstreamWatermarkCallbacks& watermark_callbacks) override; + void setDecoderBufferLimit(uint32_t limit) override { parent_.setBufferLimit(limit); } + uint32_t decoderBufferLimit() override { return parent_.buffer_limit_; } + + void requestDataTooLarge(); + void requestDataDrained(); StreamDecoderFilterSharedPtr handle_; }; @@ -389,7 +400,8 @@ class ConnectionManagerImpl : Logger::Loggable, : ActiveStreamFilterBase(parent, dual_filter), handle_(filter) {} // ActiveStreamFilterBase - Buffer::InstancePtr& bufferedData() override { return parent_.buffered_response_data_; } + Buffer::WatermarkBufferPtr createBuffer() override; + Buffer::WatermarkBufferPtr& bufferedData() override { return parent_.buffered_response_data_; } bool complete() override { return parent_.state_.local_complete_; } void doHeaders(bool end_stream) override { parent_.encodeHeaders(this, *parent_.response_headers_, end_stream); @@ -401,14 +413,19 @@ class ConnectionManagerImpl : Logger::Loggable, const HeaderMapPtr& trailers() override { return parent_.response_trailers_; } // Http::StreamEncoderFilterCallbacks - void addEncodedData(Buffer::Instance& data) override; + void addEncodedData(Buffer::Instance& data, bool streaming) override; void onEncoderFilterAboveWriteBufferHighWatermark() override; void onEncoderFilterBelowWriteBufferLowWatermark() override; + void setEncoderBufferLimit(uint32_t limit) override { parent_.setBufferLimit(limit); } + uint32_t encoderBufferLimit() override { return parent_.buffer_limit_; } void continueEncoding() override; const Buffer::Instance* encodingBuffer() override { return parent_.buffered_response_data_.get(); } + void responseDataTooLarge(); + void responseDataDrained(); + StreamEncoderFilterSharedPtr handle_; }; @@ -436,11 +453,11 @@ class ConnectionManagerImpl : Logger::Loggable, uint64_t connectionId(); const Network::Connection* connection(); Ssl::Connection* ssl(); - void addDecodedData(ActiveStreamDecoderFilter& filter, Buffer::Instance& data); + void addDecodedData(ActiveStreamDecoderFilter& filter, Buffer::Instance& data, bool streaming); void decodeHeaders(ActiveStreamDecoderFilter* filter, HeaderMap& headers, bool end_stream); void decodeData(ActiveStreamDecoderFilter* filter, Buffer::Instance& data, bool end_stream); void decodeTrailers(ActiveStreamDecoderFilter* filter, HeaderMap& trailers); - void addEncodedData(ActiveStreamEncoderFilter& filter, Buffer::Instance& data); + void addEncodedData(ActiveStreamEncoderFilter& filter, Buffer::Instance& data, bool streaming); void encodeHeaders(ActiveStreamEncoderFilter* filter, HeaderMap& headers, bool end_stream); void encodeData(ActiveStreamEncoderFilter* filter, Buffer::Instance& data, bool end_stream); void encodeTrailers(ActiveStreamEncoderFilter* filter, HeaderMap& trailers); @@ -503,21 +520,30 @@ class ConnectionManagerImpl : Logger::Loggable, State() : remote_complete_(false), local_complete_(false), saw_connection_close_(false) {} uint32_t filter_call_state_{0}; + // The following 3 members are booleans rather than part of the space-saving bitfield as they + // are passed as arguments to functions expecting bools. Extend State using the bitfield + // where possible. + bool encoder_filters_streaming_{true}; + bool decoder_filters_streaming_{true}; + bool destroyed_{false}; bool remote_complete_ : 1; bool local_complete_ : 1; bool saw_connection_close_ : 1; }; + // Possibly increases buffer_limit_ to the value of limit. + void setBufferLimit(uint32_t limit); + ConnectionManagerImpl& connection_manager_; Router::ConfigConstSharedPtr snapped_route_config_; Tracing::SpanPtr active_span_{new Tracing::NullSpan()}; const uint64_t stream_id_; StreamEncoder* response_encoder_{}; HeaderMapPtr response_headers_; - Buffer::InstancePtr buffered_response_data_; // TODO(mattklein123): buffer data stat + Buffer::WatermarkBufferPtr buffered_response_data_; HeaderMapPtr response_trailers_{}; HeaderMapPtr request_headers_; - Buffer::InstancePtr buffered_request_data_; // TODO(mattklein123): buffer data stat + Buffer::WatermarkBufferPtr buffered_request_data_; HeaderMapPtr request_trailers_; std::list decoder_filters_; std::list encoder_filters_; @@ -528,6 +554,7 @@ class ConnectionManagerImpl : Logger::Loggable, std::string downstream_address_; Optional cached_route_; DownstreamWatermarkCallbacks* watermark_callbacks_{nullptr}; + uint32_t buffer_limit_{0}; uint32_t high_watermark_count_{0}; }; diff --git a/source/common/http/filter/BUILD b/source/common/http/filter/BUILD index 4d7bb76a3f39d..ded086393c138 100644 --- a/source/common/http/filter/BUILD +++ b/source/common/http/filter/BUILD @@ -22,8 +22,10 @@ envoy_cc_library( "//source/common/buffer:buffer_lib", "//source/common/common:assert_lib", "//source/common/common:enum_to_int", + "//source/common/http:codes_lib", "//source/common/http:header_map_lib", "//source/common/http:headers_lib", + "//source/common/http:utility_lib", ], ) diff --git a/source/common/http/filter/buffer_filter.cc b/source/common/http/filter/buffer_filter.cc index 48c5f313cbfdc..a044b2110375d 100644 --- a/source/common/http/filter/buffer_filter.cc +++ b/source/common/http/filter/buffer_filter.cc @@ -9,8 +9,10 @@ #include "common/common/assert.h" #include "common/common/enum_to_int.h" +#include "common/http/codes.h" #include "common/http/header_map_impl.h" #include "common/http/headers.h" +#include "common/http/utility.h" namespace Envoy { namespace Http { @@ -36,15 +38,9 @@ FilterDataStatus BufferFilter::decodeData(Buffer::Instance&, bool end_stream) { if (end_stream) { resetInternalState(); return FilterDataStatus::Continue; - } else if (callbacks_->decodingBuffer() && - callbacks_->decodingBuffer()->length() > config_->max_request_bytes_) { - // TODO(htuch): Switch this to Utility::sendLocalReply(). - Http::HeaderMapPtr response_headers{new HeaderMapImpl{ - {Headers::get().Status, std::to_string(enumToInt(Http::Code::PayloadTooLarge))}}}; - callbacks_->encodeHeaders(std::move(response_headers), true); - config_->stats_.rq_too_large_.inc(); } + // Buffer until the complete request has been processed or the ConnectionManagerImpl sends a 413. return FilterDataStatus::StopIterationAndBuffer; } @@ -72,6 +68,7 @@ void BufferFilter::resetInternalState() { request_timeout_.reset(); } void BufferFilter::setDecoderFilterCallbacks(StreamDecoderFilterCallbacks& callbacks) { callbacks_ = &callbacks; + callbacks_->setDecoderBufferLimit(config_->max_request_bytes_); } } // namespace Http diff --git a/source/common/http/filter/buffer_filter.h b/source/common/http/filter/buffer_filter.h index 23db7d515097e..2d90f1f81543a 100644 --- a/source/common/http/filter/buffer_filter.h +++ b/source/common/http/filter/buffer_filter.h @@ -18,8 +18,7 @@ namespace Http { */ // clang-format off #define ALL_BUFFER_FILTER_STATS(COUNTER) \ - COUNTER(rq_timeout) \ - COUNTER(rq_too_large) + COUNTER(rq_timeout) // clang-format on /** diff --git a/source/common/http/filter/fault_filter.cc b/source/common/http/filter/fault_filter.cc index f323175651ee6..951291c7aa5f1 100644 --- a/source/common/http/filter/fault_filter.cc +++ b/source/common/http/filter/fault_filter.cc @@ -211,8 +211,11 @@ void FaultFilter::recordAbortsInjectedStats() { } FilterDataStatus FaultFilter::decodeData(Buffer::Instance&, bool) { - return delay_timer_ == nullptr ? FilterDataStatus::Continue - : FilterDataStatus::StopIterationAndBuffer; + if (delay_timer_ == nullptr) { + return FilterDataStatus::Continue; + } + // If the request is too large, stop reading new data until the buffer drains. + return FilterDataStatus::StopIterationAndWatermark; } FilterTrailersStatus FaultFilter::decodeTrailers(HeaderMap&) { diff --git a/source/common/http/filter/ratelimit.cc b/source/common/http/filter/ratelimit.cc index c1086a3a4cbaa..0e596560d2620 100644 --- a/source/common/http/filter/ratelimit.cc +++ b/source/common/http/filter/ratelimit.cc @@ -84,8 +84,11 @@ FilterHeadersStatus Filter::decodeHeaders(HeaderMap& headers, bool) { FilterDataStatus Filter::decodeData(Buffer::Instance&, bool) { ASSERT(state_ != State::Responded); - return state_ == State::Calling ? FilterDataStatus::StopIterationAndBuffer - : FilterDataStatus::Continue; + if (state_ != State::Calling) { + return FilterDataStatus::Continue; + } + // If the request is too large, stop reading new data until the buffer drains. + return FilterDataStatus::StopIterationAndWatermark; } FilterTrailersStatus Filter::decodeTrailers(HeaderMap&) { diff --git a/source/common/http/filter/ratelimit.h b/source/common/http/filter/ratelimit.h index 80d34502875ff..cf472cca194e4 100644 --- a/source/common/http/filter/ratelimit.h +++ b/source/common/http/filter/ratelimit.h @@ -103,9 +103,9 @@ class Filter : public StreamDecoderFilter, public Envoy::RateLimit::RequestCallb FilterConfigSharedPtr config_; Envoy::RateLimit::ClientPtr client_; StreamDecoderFilterCallbacks* callbacks_{}; - bool initiating_call_{}; State state_{State::NotStarted}; Upstream::ClusterInfoConstSharedPtr cluster_; + bool initiating_call_{}; }; } // namespace RateLimit diff --git a/source/common/http/http1/codec_impl.cc b/source/common/http/http1/codec_impl.cc index 4e6cd43e64563..bde1e140d0840 100644 --- a/source/common/http/http1/codec_impl.cc +++ b/source/common/http/http1/codec_impl.cc @@ -179,6 +179,8 @@ void StreamEncoderImpl::resetStream(StreamResetReason reason) { void StreamEncoderImpl::readDisable(bool disable) { connection_.readDisable(disable); } +uint32_t StreamEncoderImpl::bufferLimit() { return connection_.bufferLimit(); } + static const char RESPONSE_PREFIX[] = "HTTP/1.1 "; void ResponseStreamEncoderImpl::encodeHeaders(const HeaderMap& headers, bool end_stream) { diff --git a/source/common/http/http1/codec_impl.h b/source/common/http/http1/codec_impl.h index f41519cc5262d..20abd64444985 100644 --- a/source/common/http/http1/codec_impl.h +++ b/source/common/http/http1/codec_impl.h @@ -43,6 +43,7 @@ class StreamEncoderImpl : public StreamEncoder, void removeCallbacks(StreamCallbacks& callbacks) override { removeCallbacks_(callbacks); } void resetStream(StreamResetReason reason) override; void readDisable(bool disable) override; + uint32_t bufferLimit() override; protected: StreamEncoderImpl(ConnectionImpl& connection) : connection_(connection) {} diff --git a/source/common/http/http2/codec_impl.h b/source/common/http/http2/codec_impl.h index 4f8419a87ac3e..f1b7e949c0f6c 100644 --- a/source/common/http/http2/codec_impl.h +++ b/source/common/http/http2/codec_impl.h @@ -157,6 +157,7 @@ class ConnectionImpl : public virtual Connection, protected Logger::Loggable void { + callbacks.encodeHeaders(std::move(headers), end_stream); + }, + [&](Buffer::Instance& data, bool end_stream) -> void { + callbacks.encodeData(data, end_stream); + }, + is_reset, response_code, body_text); +} + +void Utility::sendLocalReply( + std::function encode_headers, + std::function encode_data, const bool& is_reset, + Code response_code, const std::string& body_text) { HeaderMapPtr response_headers{ new HeaderMapImpl{{Headers::get().Status, std::to_string(enumToInt(response_code))}}}; if (!body_text.empty()) { @@ -176,12 +190,12 @@ void Utility::sendLocalReply(StreamDecoderFilterCallbacks& callbacks, const bool response_headers->insertContentType().value(Headers::get().ContentTypeValues.Text); } - callbacks.encodeHeaders(std::move(response_headers), body_text.empty()); + encode_headers(std::move(response_headers), body_text.empty()); if (!body_text.empty() && !is_reset) { Buffer::OwnedImpl buffer(body_text); // TODO(htuch): We shouldn't encodeData() if the stream is reset in the encodeHeaders() above, // see https://github.com/lyft/envoy/issues/1283. - callbacks.encodeData(buffer, true); + encode_data(buffer, true); } } diff --git a/source/common/http/utility.h b/source/common/http/utility.h index be096dcbf60cd..e862da12e1882 100644 --- a/source/common/http/utility.h +++ b/source/common/http/utility.h @@ -97,6 +97,21 @@ class Utility { */ static void sendLocalReply(StreamDecoderFilterCallbacks& callbacks, const bool& is_reset, Code response_code, const std::string& body_text); + /** + * Create a locally generated response using the provided lambdas. + * @param encode_headers supplies the function to encode response headers. + * @param encode_data supplies the function to encode the response body. + * @param is_reset boolean reference that indicates whether a stream has been reset. It is the + * responsibility of the caller to ensure that this is set to false if onDestroy() + * is invoked in the context of sendLocalReply(). + * @param response_code supplies the HTTP response code. + * @param body_text supplies the optional body text which is sent using the text/plain content + * type. + */ + static void + sendLocalReply(std::function encode_headers, + std::function encode_data, + const bool& is_reset, Code response_code, const std::string& body_text); /** * Send a redirect response (301). diff --git a/source/common/router/BUILD b/source/common/router/BUILD index 47d91e8cb2fb6..ba8383b00f003 100644 --- a/source/common/router/BUILD +++ b/source/common/router/BUILD @@ -136,6 +136,7 @@ envoy_cc_library( "//include/envoy/stats:stats_macros", "//include/envoy/upstream:cluster_manager_interface", "//include/envoy/upstream:upstream_interface", + "//source/common/buffer:watermark_buffer_lib", "//source/common/common:assert_lib", "//source/common/common:empty_string", "//source/common/common:enum_to_int", diff --git a/source/common/router/router.cc b/source/common/router/router.cc index e330d91fefe6f..2b5333781b96e 100644 --- a/source/common/router/router.cc +++ b/source/common/router/router.cc @@ -26,6 +26,9 @@ namespace Envoy { namespace Router { +namespace { +uint32_t getLength(const Buffer::Instance* instance) { return instance ? instance->length() : 0; } +} // namespace void FilterUtility::setUpstreamScheme(Http::HeaderMap& headers, const Upstream::ClusterInfo& cluster) { @@ -277,6 +280,14 @@ void Filter::sendNoHealthyUpstreamResponse() { Http::FilterDataStatus Filter::decodeData(Buffer::Instance& data, bool end_stream) { bool buffering = (retry_state_ && retry_state_->enabled()) || do_shadowing_; + if (buffering && buffer_limit_ > 0 && + getLength(callbacks_->decodingBuffer()) + data.length() > buffer_limit_) { + // The request is larger than we should buffer. Give up on the retry/shadow + cluster_->stats().retry_or_shadow_abandoned_.inc(); + retry_state_.reset(); + buffering = false; + do_shadowing_ = false; + } // If we are going to buffer for retries or shadowing, we need to make a copy before encoding // since it's all moves from here on. @@ -292,6 +303,8 @@ Http::FilterDataStatus Filter::decodeData(Buffer::Instance& data, bool end_strea } // If we are potentially going to retry or shadow this request we need to buffer. + // This will not cause the connection manager to 413 because before we hit the + // buffer limit we give up on retries and buffering. return buffering ? Http::FilterDataStatus::StopIterationAndBuffer : Http::FilterDataStatus::StopIterationNoBuffer; } @@ -303,6 +316,14 @@ Http::FilterTrailersStatus Filter::decodeTrailers(Http::HeaderMap& trailers) { return Http::FilterTrailersStatus::StopIteration; } +void Filter::setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callbacks) { + callbacks_ = &callbacks; + // As the decoder filter only pushes back via watermarks once data has reached + // it, it can latch the current buffer limit and does not need to update the + // limit if another filter increases it. + buffer_limit_ = callbacks_->decoderBufferLimit(); +} + void Filter::cleanup() { upstream_request_.reset(); retry_state_.reset(); @@ -651,7 +672,10 @@ void Filter::UpstreamRequest::encodeData(Buffer::Instance& data, bool end_stream if (!request_encoder_) { ENVOY_STREAM_LOG(trace, "buffering {} bytes", *parent_.callbacks_, data.length()); if (!buffered_request_body_) { - buffered_request_body_.reset(new Buffer::OwnedImpl()); + buffered_request_body_.reset( + new Buffer::WatermarkBuffer([this]() -> void { this->enableDataFromDownstream(); }, + [this]() -> void { this->disableDataFromDownstream(); })); + buffered_request_body_->setWatermarks(parent_.buffer_limit_); } buffered_request_body_->move(data); diff --git a/source/common/router/router.h b/source/common/router/router.h index 72a99b9b32a78..926abda2d6de5 100644 --- a/source/common/router/router.h +++ b/source/common/router/router.h @@ -14,6 +14,7 @@ #include "envoy/stats/stats_macros.h" #include "envoy/upstream/cluster_manager.h" +#include "common/buffer/watermark_buffer.h" #include "common/common/logger.h" namespace Envoy { @@ -121,9 +122,7 @@ class Filter : Logger::Loggable, Http::FilterHeadersStatus decodeHeaders(Http::HeaderMap& headers, bool end_stream) override; Http::FilterDataStatus decodeData(Buffer::Instance& data, bool end_stream) override; Http::FilterTrailersStatus decodeTrailers(Http::HeaderMap& trailers) override; - void setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callbacks) override { - callbacks_ = &callbacks; - } + void setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callbacks) override; // Upstream::LoadBalancerContext Optional hashKey() const override { @@ -139,6 +138,9 @@ class Filter : Logger::Loggable, return callbacks_->connection(); } +protected: + RetryStatePtr retry_state_; + private: struct UpstreamRequest : public Http::StreamDecoder, public Http::StreamCallbacks, @@ -168,13 +170,14 @@ class Filter : Logger::Loggable, // Http::StreamCallbacks void onResetStream(Http::StreamResetReason reason) override; - void onAboveWriteBufferHighWatermark() override { - // Have the connection manager disable reads on the downstream stream. + void onAboveWriteBufferHighWatermark() override { disableDataFromDownstream(); } + void onBelowWriteBufferLowWatermark() override { enableDataFromDownstream(); } + + void disableDataFromDownstream() { parent_.cluster_->stats().upstream_flow_control_backed_up_total_.inc(); parent_.callbacks_->onDecoderFilterAboveWriteBufferHighWatermark(); } - void onBelowWriteBufferLowWatermark() override { - // Have the connection manager enable reads on the downstream stream. + void enableDataFromDownstream() { parent_.cluster_->stats().upstream_flow_control_drained_total_.inc(); parent_.callbacks_->onDecoderFilterBelowWriteBufferLowWatermark(); } @@ -206,7 +209,7 @@ class Filter : Logger::Loggable, Http::ConnectionPool::Cancellable* conn_pool_stream_handle_{}; Http::StreamEncoder* request_encoder_{}; Optional deferred_reset_reason_; - Buffer::InstancePtr buffered_request_body_; + Buffer::WatermarkBufferPtr buffered_request_body_; Upstream::HostDescriptionConstSharedPtr upstream_host_; DownstreamWatermarkManager downstream_watermark_manager_{*this}; @@ -259,10 +262,10 @@ class Filter : Logger::Loggable, FilterUtility::TimeoutData timeout_; Http::Code timeout_response_code_ = Http::Code::GatewayTimeout; UpstreamRequestPtr upstream_request_; - RetryStatePtr retry_state_; Http::HeaderMap* downstream_headers_{}; Http::HeaderMap* downstream_trailers_{}; MonotonicTime downstream_request_complete_time_; + uint32_t buffer_limit_{0}; bool stream_destroyed_{}; bool downstream_response_started_ : 1; diff --git a/source/docs/flow_control.md b/source/docs/flow_control.md index b53e829877140..ca6876af8c036 100644 --- a/source/docs/flow_control.md +++ b/source/docs/flow_control.md @@ -84,7 +84,7 @@ disable further data from downstream. On the reverse path, when the downstream backs up, the connection manager collects events for the downstream streams and the downstream conection. It passes events to the router filter via `Envoy::Http::DownstreamWatermarkCallbacks` and the router can then call `readDisable()` on the -upstream stream. The API for provideWatermarkCallbacks is a performance +upstream stream. Filters opt into subscribing to `DownstreamWatermarkCallbacks` as a performance optimization to avoid each watermark event on a downstream HTTP/2 connection resulting in "number of streams * number of filters" callbacks. Instead, only the router filter is notified and only the "number of streams" multiplier applies. Because @@ -115,9 +115,95 @@ The low watermark path is similar `ConnectionImpl::StreamImpl::pendingRecvBufferLowWatermark`. * `pendingRecvBufferLowWatermarkwhich` calls `readDisable(false)` on the stream. -## HTTP/2 filters +## HTTP/1 and HTTP/2 filters + +Each HTTP and HTTP/2 filter has an opportunity to call `decoderBufferLimit()` or +`encoderBufferLimit()` on creation. No filter should buffer more than the +configured bytes without calling the appropriate watermark callbacks or sending +an error response. + +Filters may override the default limit with calls to `setDecoderBufferLimit()` +and `setEncoderBufferLimit()`. These limits are applied as filters are creaeted +so filters later in the chain can override the limits set by prior filters. + +Most filters do not buffer internally, but instead push back on data by +returning a FilterDataStatus on `encodeData()`/`decodeData()` calls. +If a buffer is a streaming buffer, i.e. the buffered data will resolve over +time, it should return `FilterDataStatus::StopIterationAndWatermark` to pause +further data processing, which will cause the `ConnectionManagerImpl` to trigger +watermark callbacks on behalf of the filter. If a filter can not make forward progress without the +complete body, it should return `FilterDataStatus::StopIterationAndBuffer`. +in this case if the `ConnectionManagerImpl` buffers more than the allowed data +it will return an error downstream: a 413 on the request path, 500 or `resetStream()` on the +response path. + +# Decoder filters + +For filters which do their own internal buffering, filters buffering more than the buffer limit +should call `onDecoderFilterAboveWriteBufferHighWatermark` if they are streaming filters, i.e. +filters which can process more bytes as the underlying buffer is drained. This causes the +downstream stream to be readDisabled and the flow of downstream data to be +halted. The filter is then responsible for calling `onDecoderFilterBelowWriteBufferLowWatermark` +when the buffer is drained to resume the flow of data. + +Decoder filters which must buffer the full response should respond with a 413 (Payload Too Large) +when encountering a response body too large to buffer. + +The decoder high watermark path for streaming filters is as follows: + + * When an instance of `Envoy::Router::StreamDecoderFilter` buffers too much data it should call + `StreamDecoderFilterCallback::onDecoderFilterAboveWriteBufferHighWatermark()`. + * When `Envoy::Http::ConnectionManagerImpl::ActiveStreamDecoderFilter` receives + `onDecoderFilterAboveWriteBufferHighWatermark()` it calls `readDisable(true)` on the downstream + stream to pause data. + +And the low watermark path: + + * When the buffer of the `Envoy::Router::StreamDecoderFilter` drains should call + `StreamDecoderFilterCallback::onDecoderFilterBelowWriteBufferLowWatermark()`. + * When `Envoy::Http::ConnectionManagerImpl` receives + `onDecoderFilterAboveWriteBufferHighWatermark()` it calls `readDisable(false)` on the downstream + stream to pause data. -TODO(alyssawilk) implement and document. +# Encoder filters + + +Encoder filters buffering more than the buffer limit should call +`onEncoderFilterAboveWriteBufferHighWatermark` if they are streaming filters, i.e. filters which can +process more bytes as the underlying buffer is drained. The high watermark +call will be passed from the `Envoy::Http::ConnectionManagerImpl` to the `Envoy::Router::Filter` +which will `readDisable(true)` to stop the flow of upstream data. Streaming filters which +call `onEncoderFilterAboveWriteBufferHighWatermark` should call +`onEncoderFilterBelowWriteBufferLowWatermark` when the underlying buffer drains. + +Filters which must buffer a full request body before processing further, should respond with a +500 (Server Error) if encountering a request body which is larger than the buffer limits. + +The encoder high watermark path for streaming filters is as follows: + + * When an instance of `Envoy::Router::StreamEncoderFilter` buffers too much data it should call + `StreamEncoderFilterCallback::onEncodeFilterAboveWriteBufferHighWatermark()`. + * When `Envoy::Http::ConnectionManagerImpl::ActiveStreamEncoderFilter` receives + `onEncoderFilterAboveWriteBufferHighWatermark()` it calls + `ConnectionManagerImpl::ActiveStream::callHighWatermarkCallbacks()` + * `callHighWatermarkCallbacks()` then in turn calls + `DownstreamWatermarkCallbacks::onAboveWriteBufferHighWatermark()` for all + filters which registered to receive watermark events + * `Envoy::Router::Filter` receives `onAboveWriteBufferHighWatermark()` and calls + `readDisable(false)` on the upstream request. + +The encoder low watermark path for streaming filters is as follows: + + * When an instance of `Envoy::Router::StreamEncoderFilter` buffers too much data it should call + `StreamEncoderFilterCallback::onEncodeFilterBelowWriteBufferLowWatermark()`. + * When `Envoy::Http::ConnectionManagerImpl::ActiveStreamEncoderFilter` receives + `onEncoderFilterBelowWriteBufferLowWatermark()` it calls + `ConnectionManagerImpl::ActiveStream::callLowWatermarkCallbacks()` + * `callLowWatermarkCallbacks()` then in turn calls + `DownstreamWatermarkCallbacks::onBelowWriteBufferLowWatermark()` for all + filters which registered to receive watermark events + * `Envoy::Router::Filter` receives `onBelowWriteBufferLowWatermark()` and calls + `readDisable(true)` on the upstream request. # HTTP and HTTP/2 codec upstream send buffer diff --git a/test/common/buffer/watermark_buffer_test.cc b/test/common/buffer/watermark_buffer_test.cc index 668b80ebbf5be..bbfc0da58de7a 100644 --- a/test/common/buffer/watermark_buffer_test.cc +++ b/test/common/buffer/watermark_buffer_test.cc @@ -18,6 +18,7 @@ class WatermarkBufferTest : public testing::Test { uint32_t times_low_watermark_called_{0}; uint32_t times_high_watermark_called_{0}; }; +TEST_F(WatermarkBufferTest, TestWatermark) { ASSERT_EQ(10, buffer_.highWatermark()); } TEST_F(WatermarkBufferTest, AddChar) { buffer_.add(TEN_BYTES, 10); @@ -148,6 +149,13 @@ TEST_F(WatermarkBufferTest, MoveWatermarks) { buffer_.setWatermarks(8, 20); buffer_.setWatermarks(10, 20); EXPECT_EQ(1, times_low_watermark_called_); + + EXPECT_EQ(1, times_high_watermark_called_); + buffer_.setWatermarks(2); + EXPECT_EQ(2, times_high_watermark_called_); + EXPECT_EQ(1, times_low_watermark_called_); + buffer_.setWatermarks(0); + EXPECT_EQ(2, times_low_watermark_called_); } TEST_F(WatermarkBufferTest, GetRawSlices) { diff --git a/test/common/grpc/grpc_web_filter_test.cc b/test/common/grpc/grpc_web_filter_test.cc index b0bb2510cc793..75704763ebed5 100644 --- a/test/common/grpc/grpc_web_filter_test.cc +++ b/test/common/grpc/grpc_web_filter_test.cc @@ -287,9 +287,8 @@ TEST_P(GrpcWebFilterTest, Unary) { // Tests response trailers. Buffer::OwnedImpl trailers_buffer; - EXPECT_CALL(encoder_callbacks_, addEncodedData(_)).WillOnce(Invoke([&](Buffer::Instance& data) { - trailers_buffer.move(data); - })); + EXPECT_CALL(encoder_callbacks_, addEncodedData(_, true)) + .WillOnce(Invoke([&](Buffer::Instance& data, bool) { trailers_buffer.move(data); })); Http::TestHeaderMapImpl response_trailers; response_trailers.addCopy(Http::Headers::get().GrpcStatus, "0"); response_trailers.addCopy(Http::Headers::get().GrpcMessage, "ok"); diff --git a/test/common/http/BUILD b/test/common/http/BUILD index cecfefe1ce9fa..a1668f2b58cc0 100644 --- a/test/common/http/BUILD +++ b/test/common/http/BUILD @@ -170,6 +170,8 @@ envoy_cc_test( "//source/common/http:header_map_lib", "//source/common/http:utility_lib", "//source/common/network:address_lib", + "//test/mocks/http:http_mocks", + "//test/mocks/upstream:upstream_mocks", "//test/test_common:utility_lib", ], ) diff --git a/test/common/http/conn_manager_impl_test.cc b/test/common/http/conn_manager_impl_test.cc index b3f02379d805c..be3b01b71eef4 100644 --- a/test/common/http/conn_manager_impl_test.cc +++ b/test/common/http/conn_manager_impl_test.cc @@ -38,6 +38,7 @@ using testing::AnyNumber; using testing::AtLeast; +using testing::DoAll; using testing::InSequence; using testing::Invoke; using testing::InvokeWithoutArgs; @@ -139,6 +140,7 @@ class HttpConnectionManagerImplTest : public Test, public ConnectionManagerConfi EXPECT_CALL(stream_, addCallbacks(_)) .WillOnce(Invoke( [&](Http::StreamCallbacks& callbacks) -> void { stream_callbacks_ = &callbacks; })); + EXPECT_CALL(stream_, bufferLimit()).WillOnce(Return(initial_buffer_limit_)); EXPECT_CALL(*codec_, dispatch(_)).WillOnce(Invoke([&](Buffer::Instance&) -> void { StreamDecoder* decoder = &conn_manager_->newStream(response_encoder_); HeaderMapPtr headers{new TestHeaderMapImpl{{":authority", "host"}, {":path", "/"}}}; @@ -150,16 +152,21 @@ class HttpConnectionManagerImplTest : public Test, public ConnectionManagerConfi EXPECT_CALL(*decoder_filters_[0], decodeHeaders(_, true)) .WillOnce(InvokeWithoutArgs([&]() -> FilterHeadersStatus { Buffer::OwnedImpl data("hello"); - decoder_filters_[0]->callbacks_->addDecodedData(data); + decoder_filters_[0]->callbacks_->addDecodedData(data, true); return FilterHeadersStatus::Continue; })); + } + + void sendReqestHeadersAndData() { EXPECT_CALL(*decoder_filters_[1], decodeHeaders(_, false)) .WillOnce(Return(FilterHeadersStatus::StopIteration)); - EXPECT_CALL(*decoder_filters_[1], decodeData(_, true)) - .WillOnce(Return(FilterDataStatus::StopIterationAndBuffer)); + auto status = streaming_filter_ ? FilterDataStatus::StopIterationAndWatermark + : FilterDataStatus::StopIterationAndBuffer; + EXPECT_CALL(*decoder_filters_[1], decodeData(_, true)).WillOnce(Return(status)); - // Kick off the incoming data. - Buffer::OwnedImpl fake_input("1234"); + // Kick off the incoming data. |fake_input| is not sent, but instead kicks + // off sending the headers and |data| queued up in setUpEncoderAndDecoder(). + Buffer::OwnedImpl fake_input("asdf"); conn_manager_->onData(fake_input); } @@ -256,6 +263,8 @@ class HttpConnectionManagerImplTest : public Test, public ConnectionManagerConfi MockStream stream_; Http::StreamCallbacks* stream_callbacks_{nullptr}; NiceMock cluster_manager_; + uint32_t initial_buffer_limit_; + bool streaming_filter_{false}; // TODO(mattklein123): Not all tests have been converted over to better setup. Convert the rest. MockStreamEncoder response_encoder_; @@ -1084,7 +1093,7 @@ TEST_F(HttpConnectionManagerImplTest, FilterAddBodyInTrailersCallback) { Buffer::OwnedImpl trailers_data("hello"); EXPECT_CALL(*decoder_filters_[0], decodeTrailers(_)) .WillOnce(InvokeWithoutArgs([&]() -> FilterTrailersStatus { - decoder_filters_[0]->callbacks_->addDecodedData(trailers_data); + decoder_filters_[0]->callbacks_->addDecodedData(trailers_data, true); return FilterTrailersStatus::Continue; })); EXPECT_CALL(*decoder_filters_[1], decodeData(Ref(trailers_data), false)) @@ -1115,7 +1124,7 @@ TEST_F(HttpConnectionManagerImplTest, FilterAddBodyInTrailersCallback) { decoder_filters_[1]->callbacks_->encodeData(response_body, false); EXPECT_CALL(*encoder_filters_[0], encodeTrailers(_)) .WillOnce(InvokeWithoutArgs([&]() -> FilterTrailersStatus { - encoder_filters_[0]->callbacks_->addEncodedData(trailers_data); + encoder_filters_[0]->callbacks_->addEncodedData(trailers_data, true); return FilterTrailersStatus::Continue; })); EXPECT_CALL(*encoder_filters_[1], encodeData(Ref(trailers_data), false)) @@ -1145,7 +1154,7 @@ TEST_F(HttpConnectionManagerImplTest, FilterAddBodyInline) { EXPECT_CALL(*decoder_filters_[0], decodeHeaders(_, true)) .WillOnce(InvokeWithoutArgs([&]() -> FilterHeadersStatus { Buffer::OwnedImpl data("hello"); - decoder_filters_[0]->callbacks_->addDecodedData(data); + decoder_filters_[0]->callbacks_->addDecodedData(data, true); return FilterHeadersStatus::Continue; })); EXPECT_CALL(*decoder_filters_[1], decodeHeaders(_, false)) @@ -1160,7 +1169,7 @@ TEST_F(HttpConnectionManagerImplTest, FilterAddBodyInline) { EXPECT_CALL(*encoder_filters_[0], encodeHeaders(_, true)) .WillOnce(InvokeWithoutArgs([&]() -> FilterHeadersStatus { Buffer::OwnedImpl data("hello"); - encoder_filters_[0]->callbacks_->addEncodedData(data); + encoder_filters_[0]->callbacks_->addEncodedData(data, true); EXPECT_EQ(5UL, encoder_filters_[0]->callbacks_->encodingBuffer()->length()); return FilterHeadersStatus::Continue; })); @@ -1209,6 +1218,7 @@ TEST_F(HttpConnectionManagerImplTest, FilterClearRouteCache) { TEST_F(HttpConnectionManagerImplTest, UpstreamWatermarkCallbacks) { setup(false, ""); setUpEncoderAndDecoder(); + sendReqestHeadersAndData(); // Mimic the upstream connection backing up. The router would call // onDecoderFilterAboveWriteBufferHighWatermark which should readDisable the stream and increment @@ -1254,6 +1264,7 @@ TEST_F(HttpConnectionManagerImplTest, UpstreamWatermarkCallbacks) { TEST_F(HttpConnectionManagerImplTest, DownstreamWatermarkCallbacks) { setup(false, ""); setUpEncoderAndDecoder(); + sendReqestHeadersAndData(); // Test what happens when there are no subscribers. conn_manager_->onAboveWriteBufferHighWatermark(); @@ -1315,12 +1326,158 @@ TEST_F(HttpConnectionManagerImplTest, UnderlyingConnectionWatermarksPassedOn) { // callbacks immediately. EXPECT_CALL(filter_callbacks_.connection_, aboveHighWatermark()).WillOnce(Return(true)); setUpEncoderAndDecoder(); + sendReqestHeadersAndData(); ASSERT_GE(decoder_filters_.size(), 1); MockDownstreamWatermarkCallbacks callbacks; EXPECT_CALL(callbacks, onAboveWriteBufferHighWatermark()); decoder_filters_[0]->callbacks_->addDownstreamWatermarkCallbacks(callbacks); } +TEST_F(HttpConnectionManagerImplTest, AlterFilterWatermarkLimits) { + initial_buffer_limit_ = 100; + setup(false, ""); + setUpEncoderAndDecoder(); + sendReqestHeadersAndData(); + + // Check initial limits. + EXPECT_EQ(initial_buffer_limit_, decoder_filters_[0]->callbacks_->decoderBufferLimit()); + EXPECT_EQ(initial_buffer_limit_, encoder_filters_[0]->callbacks_->encoderBufferLimit()); + + // Check lowering the limits. + decoder_filters_[0]->callbacks_->setDecoderBufferLimit(initial_buffer_limit_ - 1); + EXPECT_EQ(initial_buffer_limit_ - 1, decoder_filters_[0]->callbacks_->decoderBufferLimit()); + + // Check raising the limits. + decoder_filters_[0]->callbacks_->setDecoderBufferLimit(initial_buffer_limit_ + 1); + EXPECT_EQ(initial_buffer_limit_ + 1, decoder_filters_[0]->callbacks_->decoderBufferLimit()); + EXPECT_EQ(initial_buffer_limit_ + 1, encoder_filters_[0]->callbacks_->encoderBufferLimit()); + + // Verify turning off buffer limits works. + decoder_filters_[0]->callbacks_->setDecoderBufferLimit(0); + EXPECT_EQ(0, decoder_filters_[0]->callbacks_->decoderBufferLimit()); + + // Once the limits are turned off can be turned on again. + decoder_filters_[0]->callbacks_->setDecoderBufferLimit(1); + EXPECT_EQ(1, decoder_filters_[0]->callbacks_->decoderBufferLimit()); +} + +TEST_F(HttpConnectionManagerImplTest, HitFilterWatermarkLimits) { + initial_buffer_limit_ = 1; + streaming_filter_ = true; + setup(false, ""); + setUpEncoderAndDecoder(); + + // The filter is a streaming filter. Sending 4 bytes should hit the + // watermark limit and disable reads on the stream. + EXPECT_CALL(stream_, readDisable(true)); + sendReqestHeadersAndData(); + + // Change the limit so the buffered data is below the new watermark. The + // stream should be read-enabled + EXPECT_CALL(stream_, readDisable(false)); + int buffer_len = decoder_filters_[0]->callbacks_->decodingBuffer()->length(); + decoder_filters_[0]->callbacks_->setDecoderBufferLimit((buffer_len + 1) * 2); + + // Start the response + HeaderMapPtr response_headers{new TestHeaderMapImpl{{":status", "200"}}}; + EXPECT_CALL(*encoder_filters_[0], encodeHeaders(_, false)) + .WillOnce(Return(FilterHeadersStatus::StopIteration)); + decoder_filters_[0]->callbacks_->encodeHeaders(std::move(response_headers), false); + + MockDownstreamWatermarkCallbacks callbacks; + decoder_filters_[0]->callbacks_->addDownstreamWatermarkCallbacks(callbacks); + + // Now overload the buffer with response data. The downstream watermark + // callbacks should be called. + EXPECT_CALL(callbacks, onAboveWriteBufferHighWatermark()); + Buffer::OwnedImpl fake_response("A long enough string to go over watermarks"); + EXPECT_CALL(*encoder_filters_[0], encodeData(_, false)) + .WillOnce(Return(FilterDataStatus::StopIterationAndWatermark)); + decoder_filters_[0]->callbacks_->encodeData(fake_response, false); + + // Change the limit so the buffered data is below the new watermark. + buffer_len = encoder_filters_[0]->callbacks_->encodingBuffer()->length(); + EXPECT_CALL(callbacks, onBelowWriteBufferLowWatermark()); + encoder_filters_[0]->callbacks_->setEncoderBufferLimit((buffer_len + 1) * 2); +} + +TEST_F(HttpConnectionManagerImplTest, HitRequestBufferLimits) { + initial_buffer_limit_ = 10; + streaming_filter_ = false; + setup(false, ""); + setUpEncoderAndDecoder(); + sendReqestHeadersAndData(); + + // Set the filter to be a buffering filter. Sending any data will hit the + // watermark limit and result in a 413 being sent to the user. + Http::TestHeaderMapImpl response_headers{ + {":status", "413"}, {"content-length", "17"}, {"content-type", "text/plain"}}; + EXPECT_CALL(*encoder_filters_[0], encodeHeaders(HeaderMapEqualRef(&response_headers), false)) + .WillOnce(Return(FilterHeadersStatus::StopIteration)); + EXPECT_CALL(*encoder_filters_[0], encodeData(_, true)) + .WillOnce(Return(FilterDataStatus::StopIterationAndWatermark)); + Buffer::OwnedImpl data("A longer string"); + decoder_filters_[0]->callbacks_->addDecodedData(data, false); +} + +TEST_F(HttpConnectionManagerImplTest, HitResponseBufferLimitsBeforeHeaders) { + initial_buffer_limit_ = 10; + setup(false, ""); + setUpEncoderAndDecoder(); + sendReqestHeadersAndData(); + + // Start the response without processing the request headers through all + // filters. + HeaderMapPtr response_headers{new TestHeaderMapImpl{{":status", "200"}}}; + EXPECT_CALL(*encoder_filters_[0], encodeHeaders(_, false)) + .WillOnce(Return(FilterHeadersStatus::StopIteration)); + decoder_filters_[0]->callbacks_->encodeHeaders(std::move(response_headers), false); + + // Now overload the buffer with response data. The filter returns + // StopIterationAndBuffer, which will trigger an early response. + + expectOnDestroy(); + Http::TestHeaderMapImpl expected_response_headers{ + {":status", "500"}, {"content-length", "21"}, {"content-type", "text/plain"}}; + Buffer::OwnedImpl fake_response("A long enough string to go over watermarks"); + // Fake response starts doing through the filter. + EXPECT_CALL(*encoder_filters_[0], encodeData(_, false)) + .WillOnce(Return(FilterDataStatus::StopIterationAndBuffer)); + std::string response_body; + // The 500 goes directly to the encoder. + EXPECT_CALL(response_encoder_, + encodeHeaders(HeaderMapEqualRef(&expected_response_headers), false)); + EXPECT_CALL(response_encoder_, encodeData(_, true)).WillOnce(AddBufferToString(&response_body)); + decoder_filters_[0]->callbacks_->encodeData(fake_response, false); + EXPECT_EQ("Internal Server Error", response_body); +} + +TEST_F(HttpConnectionManagerImplTest, HitResponseBufferLimitsAfterHeaders) { + initial_buffer_limit_ = 10; + setup(false, ""); + setUpEncoderAndDecoder(); + sendReqestHeadersAndData(); + + // Start the response, and make sure the request headers are fully processed. + HeaderMapPtr response_headers{new TestHeaderMapImpl{{":status", "200"}}}; + EXPECT_CALL(*encoder_filters_[0], encodeHeaders(_, false)) + .WillOnce(Return(FilterHeadersStatus::Continue)); + EXPECT_CALL(*encoder_filters_[1], encodeHeaders(_, false)) + .WillOnce(Return(FilterHeadersStatus::Continue)); + EXPECT_CALL(response_encoder_, encodeHeaders(_, false)); + decoder_filters_[0]->callbacks_->encodeHeaders(std::move(response_headers), false); + + // Now overload the buffer with response data. The filter returns + // StopIterationAndBuffer, which will trigger an early reset. + const std::string data = "A long enough string to go over watermarks"; + Buffer::OwnedImpl fake_response(data); + InSequence s; + EXPECT_CALL(*encoder_filters_[0], encodeData(_, false)) + .WillOnce(Return(FilterDataStatus::StopIterationAndBuffer)); + EXPECT_CALL(stream_, resetStream(_)); + decoder_filters_[0]->callbacks_->encodeData(fake_response, false); +} + TEST_F(HttpConnectionManagerImplTest, FilterAddBodyContinuation) { InSequence s; setup(false, ""); @@ -1346,7 +1503,7 @@ TEST_F(HttpConnectionManagerImplTest, FilterAddBodyContinuation) { .WillOnce(Return(FilterDataStatus::Continue)); Buffer::OwnedImpl data("hello"); - decoder_filters_[0]->callbacks_->addDecodedData(data); + decoder_filters_[0]->callbacks_->addDecodedData(data, true); decoder_filters_[0]->callbacks_->continueDecoding(); EXPECT_CALL(*encoder_filters_[0], encodeHeaders(_, true)) @@ -1364,7 +1521,7 @@ TEST_F(HttpConnectionManagerImplTest, FilterAddBodyContinuation) { expectOnDestroy(); Buffer::OwnedImpl data2("hello"); - encoder_filters_[0]->callbacks_->addEncodedData(data2); + encoder_filters_[0]->callbacks_->addEncodedData(data2, true); encoder_filters_[0]->callbacks_->continueEncoding(); } diff --git a/test/common/http/filter/buffer_filter_test.cc b/test/common/http/filter/buffer_filter_test.cc index 135f427c4eaf4..fe6ab0f52ba5e 100644 --- a/test/common/http/filter/buffer_filter_test.cc +++ b/test/common/http/filter/buffer_filter_test.cc @@ -78,27 +78,6 @@ TEST_F(BufferFilterTest, RequestTimeout) { EXPECT_EQ(1U, config_->stats_.rq_timeout_.value()); } -TEST_F(BufferFilterTest, RequestTooLarge) { - InSequence s; - - expectTimerCreate(); - - TestHeaderMapImpl headers; - EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_.decodeHeaders(headers, false)); - - Buffer::InstancePtr buffered_data(new Buffer::OwnedImpl("buffered")); - ON_CALL(callbacks_, decodingBuffer()).WillByDefault(Return(buffered_data.get())); - - Buffer::OwnedImpl data1("hello"); - config_->max_request_bytes_ = 1; - TestHeaderMapImpl response_headers{{":status", "413"}}; - EXPECT_CALL(callbacks_, encodeHeaders_(HeaderMapEqualRef(&response_headers), true)); - EXPECT_EQ(FilterDataStatus::StopIterationAndBuffer, filter_.decodeData(data1, false)); - - filter_.onDestroy(); - EXPECT_EQ(1U, config_->stats_.rq_too_large_.value()); -} - TEST_F(BufferFilterTest, TxResetAfterEndStream) { InSequence s; diff --git a/test/common/http/filter/fault_filter_test.cc b/test/common/http/filter/fault_filter_test.cc index 7198605eafdae..194661be9abf6 100644 --- a/test/common/http/filter/fault_filter_test.cc +++ b/test/common/http/filter/fault_filter_test.cc @@ -336,7 +336,7 @@ TEST_F(FaultFilterTest, FixedDelayNonZeroDuration) { .Times(0); EXPECT_CALL(filter_callbacks_, continueDecoding()); - EXPECT_EQ(FilterDataStatus::StopIterationAndBuffer, filter_->decodeData(data_, false)); + EXPECT_EQ(FilterDataStatus::StopIterationAndWatermark, filter_->decodeData(data_, false)); EXPECT_EQ(FilterTrailersStatus::StopIteration, filter_->decodeTrailers(request_headers_)); timer_->callback_(); @@ -380,7 +380,7 @@ TEST_F(FaultFilterTest, DelayForDownstreamCluster) { setResponseFlag(Http::AccessLog::ResponseFlag::FaultInjected)) .Times(0); EXPECT_CALL(filter_callbacks_, continueDecoding()); - EXPECT_EQ(FilterDataStatus::StopIterationAndBuffer, filter_->decodeData(data_, false)); + EXPECT_EQ(FilterDataStatus::StopIterationAndWatermark, filter_->decodeData(data_, false)); timer_->callback_(); @@ -630,7 +630,7 @@ TEST_F(FaultFilterTest, TimerResetAfterStreamReset) { EXPECT_CALL(filter_callbacks_, continueDecoding()).Times(0); EXPECT_EQ(0UL, config_->stats().aborts_injected_.value()); - EXPECT_EQ(FilterDataStatus::StopIterationAndBuffer, filter_->decodeData(data_, true)); + EXPECT_EQ(FilterDataStatus::StopIterationAndWatermark, filter_->decodeData(data_, true)); filter_->onDestroy(); } diff --git a/test/common/http/filter/ratelimit_test.cc b/test/common/http/filter/ratelimit_test.cc index f5227649eb972..0c0fbc3ac0b11 100644 --- a/test/common/http/filter/ratelimit_test.cc +++ b/test/common/http/filter/ratelimit_test.cc @@ -170,7 +170,7 @@ TEST_F(HttpRateLimitFilterTest, OkResponse) { request_headers_.addCopy(Http::Headers::get().RequestId, "requestid"); request_headers_.addCopy(Http::Headers::get().OtSpanContext, "context"); EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, false)); - EXPECT_EQ(FilterDataStatus::StopIterationAndBuffer, filter_->decodeData(data_, false)); + EXPECT_EQ(FilterDataStatus::StopIterationAndWatermark, filter_->decodeData(data_, false)); EXPECT_EQ(FilterTrailersStatus::StopIteration, filter_->decodeTrailers(request_headers_)); EXPECT_CALL(filter_callbacks_, continueDecoding()); diff --git a/test/common/http/http2/codec_impl_test.cc b/test/common/http/http2/codec_impl_test.cc index 25bb5f61e8345..2713f0d4cecc2 100644 --- a/test/common/http/http2/codec_impl_test.cc +++ b/test/common/http/http2/codec_impl_test.cc @@ -370,6 +370,10 @@ TEST_P(Http2CodecImplFlowControlTest, TestFlowControlInPendingSendData) { // If this limit is changed, this test will fail due to the initial large writes being divided // into more than 4 frames. Fast fail here with this explanatory comment. ASSERT_EQ(65535, initial_stream_window); + // Make sure the limits were configured properly in test set up. + EXPECT_EQ(initial_stream_window, server_.getStream(1)->bufferLimit()); + EXPECT_EQ(initial_stream_window, client_.getStream(1)->bufferLimit()); + // One large write gets broken into smaller frames. EXPECT_CALL(request_decoder_, decodeData(_, false)).Times(AnyNumber()); Buffer::OwnedImpl long_data(std::string(initial_stream_window, 'a')); diff --git a/test/common/http/utility_test.cc b/test/common/http/utility_test.cc index 3942f2b7285b2..2ff5864197028 100644 --- a/test/common/http/utility_test.cc +++ b/test/common/http/utility_test.cc @@ -7,12 +7,16 @@ #include "common/http/utility.h" #include "common/network/address_impl.h" +#include "test/mocks/http/mocks.h" #include "test/test_common/printers.h" #include "test/test_common/utility.h" #include "fmt/format.h" #include "gtest/gtest.h" +using testing::InvokeWithoutArgs; +using testing::_; + namespace Envoy { namespace Http { @@ -206,5 +210,25 @@ TEST(HttpUtility, TestParseCookieWithQuotes) { EXPECT_EQ(Utility::parseCookieValue(headers, "leadingdquote"), "\"foobar"); } +TEST(HttpUtility, SendLocalReply) { + MockStreamDecoderFilterCallbacks callbacks; + bool is_reset = false; + + EXPECT_CALL(callbacks, encodeHeaders_(_, false)); + EXPECT_CALL(callbacks, encodeData(_, true)); + Utility::sendLocalReply(callbacks, is_reset, Http::Code::PayloadTooLarge, "large"); +} + +TEST(HttpUtility, SendLocalReplyDestroyedEarly) { + MockStreamDecoderFilterCallbacks callbacks; + bool is_reset = false; + + EXPECT_CALL(callbacks, encodeHeaders_(_, false)).WillOnce(InvokeWithoutArgs([&]() -> void { + is_reset = true; + })); + EXPECT_CALL(callbacks, encodeData(_, true)).Times(0); + Utility::sendLocalReply(callbacks, is_reset, Http::Code::PayloadTooLarge, "large"); +} + } // namespace Http } // namespace Envoy diff --git a/test/common/router/router_test.cc b/test/common/router/router_test.cc index 9e1f20712129d..e902ffc9695bc 100644 --- a/test/common/router/router_test.cc +++ b/test/common/router/router_test.cc @@ -1231,7 +1231,7 @@ TEST_F(RouterTest, AutoHostRewriteDisabled) { class WatermarkTest : public RouterTest { public: - void sendRequest() { + void sendRequest(bool header_only_request = true, bool pool_ready = true) { EXPECT_CALL(callbacks_.route_->route_entry_, timeout()) .WillOnce(Return(std::chrono::milliseconds(0))); EXPECT_CALL(callbacks_.dispatcher_, createTimer_(_)).Times(0); @@ -1245,11 +1245,14 @@ class WatermarkTest : public RouterTest { [&](Http::StreamDecoder& decoder, Http::ConnectionPool::Callbacks& callbacks) -> Http::ConnectionPool::Cancellable* { response_decoder_ = &decoder; - callbacks.onPoolReady(encoder_, cm_.conn_pool_.host_); + pool_callbacks_ = &callbacks; + if (pool_ready) { + callbacks.onPoolReady(encoder_, cm_.conn_pool_.host_); + } return nullptr; })); HttpTestUtility::addDefaultHeaders(headers_); - router_.decodeHeaders(headers_, true); + router_.decodeHeaders(headers_, header_only_request); } void sendResponse() { response_decoder_->decodeHeaders( @@ -1261,6 +1264,7 @@ class WatermarkTest : public RouterTest { Http::StreamCallbacks* stream_callbacks_; Http::StreamDecoder* response_decoder_ = nullptr; Http::TestHeaderMapImpl headers_; + Http::ConnectionPool::Callbacks* pool_callbacks_{nullptr}; }; TEST_F(WatermarkTest, DownstreamWatermarks) { @@ -1301,5 +1305,77 @@ TEST_F(WatermarkTest, UpstreamWatermarks) { sendResponse(); } +TEST_F(WatermarkTest, FilterWatermarks) { + EXPECT_CALL(callbacks_, decoderBufferLimit()).WillOnce(Return(10)); + router_.setDecoderFilterCallbacks(callbacks_); + // Send the headers sans-fin, and don't flag the pool as ready. + sendRequest(false, false); + + // Send 10 bytes of body to fill the 10 byte buffer. + Buffer::OwnedImpl data("1234567890"); + router_.decodeData(data, false); + EXPECT_EQ(0u, cm_.thread_local_cluster_.cluster_.info_->stats_store_ + .counter("upstream_flow_control_backed_up_total") + .value()); + + // Send one extra byte. This should cause the buffer to go over the limit and pause downstream + // data. + Buffer::OwnedImpl last_byte("!"); + router_.decodeData(last_byte, true); + EXPECT_EQ(1U, cm_.thread_local_cluster_.cluster_.info_->stats_store_ + .counter("upstream_flow_control_backed_up_total") + .value()); + + // Now set up the downstream connection. The encoder will be given the buffered request body, + // The mock invocation below drains it, and the buffer will go under the watermark limit again. + EXPECT_EQ(0U, cm_.thread_local_cluster_.cluster_.info_->stats_store_ + .counter("upstream_flow_control_drained_total") + .value()); + EXPECT_CALL(encoder_, encodeData(_, true)) + .WillOnce(Invoke([&](Buffer::Instance& data, bool) -> void { data.drain(data.length()); })); + pool_callbacks_->onPoolReady(encoder_, cm_.conn_pool_.host_); + EXPECT_EQ(1U, cm_.thread_local_cluster_.cluster_.info_->stats_store_ + .counter("upstream_flow_control_drained_total") + .value()); + + sendResponse(); +} // namespace Router + +// Same as RetryRequestNotComplete but with decodeData larger than the buffer +// limit, no retry will occur. +TEST_F(WatermarkTest, RetryRequestNotComplete) { + EXPECT_CALL(callbacks_, decoderBufferLimit()).WillOnce(Return(10)); + router_.setDecoderFilterCallbacks(callbacks_); + NiceMock encoder1; + Http::StreamDecoder* response_decoder = nullptr; + EXPECT_CALL(cm_.conn_pool_, newStream(_, _)) + .WillRepeatedly(Invoke( + [&](Http::StreamDecoder& decoder, + Http::ConnectionPool::Callbacks& callbacks) -> Http::ConnectionPool::Cancellable* { + response_decoder = &decoder; + callbacks.onPoolReady(encoder1, cm_.conn_pool_.host_); + return nullptr; + })); + EXPECT_CALL(callbacks_.request_info_, + setResponseFlag(Http::AccessLog::ResponseFlag::UpstreamRemoteReset)); + EXPECT_CALL(callbacks_.request_info_, onUpstreamHostSelected(_)) + .WillRepeatedly(Invoke([&](const Upstream::HostDescriptionConstSharedPtr host) -> void { + EXPECT_EQ(host_address_, host->address()); + })); + + Http::TestHeaderMapImpl headers{{"x-envoy-retry-on", "5xx"}, {"x-envoy-internal", "true"}}; + HttpTestUtility::addDefaultHeaders(headers); + router_.decodeHeaders(headers, false); + Buffer::OwnedImpl data("1234567890123"); + EXPECT_CALL(*router_.retry_state_, enabled()).Times(1).WillOnce(Return(true)); + EXPECT_CALL(*router_.retry_state_, shouldRetry(_, _, _)).Times(0); + // This will result in retry_state_ being deleted. + router_.decodeData(data, false); + + // This should not trigger a retry as the retry state has been deleted. + EXPECT_CALL(cm_.conn_pool_.host_->outlier_detector_, putHttpResponseCode(503)); + encoder1.stream_.resetStream(Http::StreamResetReason::RemoteReset); +} + } // namespace Router } // namespace Envoy diff --git a/test/config/integration/server.json b/test/config/integration/server.json index 90c9670f87268..89cc493fa0550 100644 --- a/test/config/integration/server.json +++ b/test/config/integration/server.json @@ -214,6 +214,101 @@ } }] }, + { + "address": "tcp://{{ ip_loopback_address }}:0", + "per_connection_buffer_limit_bytes": 1024, + "filters": [ + { + "type": "read", + "name": "http_connection_manager", + "config": { + "codec_type": "http1", + "stat_prefix": "router", + "route_config": + { + "virtual_hosts": [ + { + "name": "integration", + "domains": [ "*" ], + "routes": [ + { + "prefix": "/test/long/url", + "cluster": "cluster_3" + } + ] + } + ] + }, + "filters": [ + { "type": "decoder", "name": "router", "config": {} } + ] + } + }] + }, + { + "address": "tcp://{{ ip_loopback_address }}:0", + "per_connection_buffer_limit_bytes": 1024, + "filters": [ + { + "type": "read", + "name": "http_connection_manager", + "config": { + "codec_type": "http1", + "stat_prefix": "router", + "route_config": + { + "virtual_hosts": [ + { + "name": "integration", + "domains": [ "*" ], + "routes": [ + { + "prefix": "/dynamo/url", + "cluster": "cluster_3" + } + ] + } + ] + }, + "filters": [ + { "type": "both", "name": "http_dynamo_filter", "config": {} }, + { "type": "decoder", "name": "router", "config": {} } + ] + } + }] + }, + { + "address": "tcp://{{ ip_loopback_address }}:0", + "per_connection_buffer_limit_bytes": 1024, + "filters": [ + { + "type": "read", + "name": "http_connection_manager", + "config": { + "codec_type": "http1", + "stat_prefix": "router", + "route_config": + { + "virtual_hosts": [ + { + "name": "integration", + "domains": [ "*" ], + "routes": [ + { + "prefix": "/test/long/url", + "cluster": "cluster_3" + } + ] + } + ] + }, + "filters": [ + { "type": "both", "name": "grpc_http1_bridge", "config": {} }, + { "type": "decoder", "name": "router", "config": {} } + ] + } + }] + }, { "address": "tcp://{{ ip_loopback_address }}:0", "filters": [ @@ -412,6 +507,14 @@ "dns_lookup_family": "{{ dns_lookup_family }}", "hosts": [{"url": "tcp://localhost:{{ upstream_1 }}"}] }, + { + "name": "cluster_3", + "per_connection_buffer_limit_bytes": 1024, + "connect_timeout_ms": 5000, + "type": "static", + "lb_type": "round_robin", + "hosts": [{"url": "tcp://{{ ip_loopback_address }}:{{ upstream_0 }}"}] + }, { "name": "statsd", "connect_timeout_ms": 5000, diff --git a/test/config/integration/server_http2.json b/test/config/integration/server_http2.json index 6d1ce16ed95d1..209c9513844e2 100644 --- a/test/config/integration/server_http2.json +++ b/test/config/integration/server_http2.json @@ -184,7 +184,7 @@ "config": { "codec_type": "http2", "http2_settings": { - "per_stream_buffer_limit": 1024 + "initial_stream_window_size": 65535 }, "drain_timeout_ms": 5000, "stat_prefix": "router", @@ -208,6 +208,42 @@ ] } }] + }, + { + "address": "tcp://{{ ip_loopback_address }}:0", + "per_connection_buffer_limit_bytes": 1024, + "filters": [ + { + "type": "read", + "name": "http_connection_manager", + "config": { + "codec_type": "http2", + "http2_settings": { + "initial_stream_window_size": 65535 + }, + "drain_timeout_ms": 5000, + "stat_prefix": "router", + "route_config": + { + "virtual_hosts": [ + { + "name": "integration", + "domains": [ "*" ], + "routes": [ + { + "prefix": "/dynamo/url", + "cluster": "cluster_3" + } + ] + } + ] + }, + "filters": [ + { "type": "both", "name": "http_dynamo_filter", "config": {} }, + { "type": "decoder", "name": "router", "config": {}} + ] + } + }] }], "admin": { "access_log_path": "/dev/null", "address": "tcp://{{ ip_loopback_address }}:0" }, diff --git a/test/config/integration/server_http2_upstream.json b/test/config/integration/server_http2_upstream.json index f1654de28f2eb..d650e34345b7d 100644 --- a/test/config/integration/server_http2_upstream.json +++ b/test/config/integration/server_http2_upstream.json @@ -264,7 +264,7 @@ "config": { "codec_type": "http2", "http2_settings": { - "per_stream_buffer_limit": 1024 + "initial_stream_window_size": 65535 }, "drain_timeout_ms": 5000, "stat_prefix": "router", diff --git a/test/integration/http2_integration_test.cc b/test/integration/http2_integration_test.cc index 0ab1ef72fa74c..8b605622eac11 100644 --- a/test/integration/http2_integration_test.cc +++ b/test/integration/http2_integration_test.cc @@ -60,7 +60,7 @@ TEST_P(Http2IntegrationTest, RouterRequestAndResponseWithGiantBodyBuffer) { } TEST_P(Http2IntegrationTest, FlowControlOnAndGiantBody) { - testRouterRequestAndResponseWithBody(makeClientConnection(lookupPort("http_buffer_limits")), + testRouterRequestAndResponseWithBody(makeClientConnection(lookupPort("http_with_buffer_limits")), Http::CodecClient::Type::HTTP2, 1024 * 1024, 1024 * 1024, false); } @@ -114,6 +114,18 @@ TEST_P(Http2IntegrationTest, TwoRequests) { testTwoRequests(Http::CodecClient::T TEST_P(Http2IntegrationTest, Retry) { testRetry(Http::CodecClient::Type::HTTP2); } +TEST_P(Http2IntegrationTest, RetryHittingBufferLimit) { + testRetryHittingBufferLimit(Http::CodecClient::Type::HTTP2); +} + +TEST_P(Http2IntegrationTest, HittingDecoderFilterLimit) { + testHittingDecoderFilterLimit(Http::CodecClient::Type::HTTP2); +} + +TEST_P(Http2IntegrationTest, HittingEncoderFilterLimit) { + testHittingEncoderFilterLimit(Http::CodecClient::Type::HTTP2); +} + TEST_P(Http2IntegrationTest, GrpcRetry) { testGrpcRetry(); } TEST_P(Http2IntegrationTest, MaxHeadersInCodec) { @@ -286,7 +298,7 @@ TEST_P(Http2IntegrationTest, SimultaneousRequest) { } TEST_P(Http2IntegrationTest, SimultaneousRequestWithBufferLimits) { - simultaneousRequest(lookupPort("http_buffer_limits"), 1024 * 32, 1024 * 16); + simultaneousRequest(lookupPort("http_with_buffer_limits"), 1024 * 32, 1024 * 16); } } // namespace Envoy diff --git a/test/integration/http2_integration_test.h b/test/integration/http2_integration_test.h index a9c624273539a..6423988ee2927 100644 --- a/test/integration/http2_integration_test.h +++ b/test/integration/http2_integration_test.h @@ -17,8 +17,9 @@ class Http2IntegrationTest : public BaseIntegrationTest, registerPort("upstream_0", fake_upstreams_.back()->localAddress()->ip()->port()); fake_upstreams_.emplace_back(new FakeUpstream(0, FakeHttpConnection::Type::HTTP1, version_)); registerPort("upstream_1", fake_upstreams_.back()->localAddress()->ip()->port()); - createTestServer("test/config/integration/server_http2.json", - {"echo", "http", "http_buffer", "http_buffer_limits"}); + createTestServer( + "test/config/integration/server_http2.json", + {"echo", "http", "http_buffer", "http_with_buffer_limits", "dynamo_with_buffer_limits"}); } void simultaneousRequest(uint32_t port, int32_t request1_bytes, int32_t request2_bytes); diff --git a/test/integration/http2_upstream_integration_test.cc b/test/integration/http2_upstream_integration_test.cc index 30ccf05b5cffb..c69d32c0f11fc 100644 --- a/test/integration/http2_upstream_integration_test.cc +++ b/test/integration/http2_upstream_integration_test.cc @@ -95,6 +95,10 @@ TEST_P(Http2UpstreamIntegrationTest, TwoRequests) { TEST_P(Http2UpstreamIntegrationTest, Retry) { testRetry(Http::CodecClient::Type::HTTP2); } +TEST_P(Http2UpstreamIntegrationTest, RetryHittingBufferLimit) { + testRetryHittingBufferLimit(Http::CodecClient::Type::HTTP2); +} + TEST_P(Http2UpstreamIntegrationTest, GrpcRetry) { testGrpcRetry(); } TEST_P(Http2UpstreamIntegrationTest, DownstreamResetBeforeResponseComplete) { diff --git a/test/integration/integration.cc b/test/integration/integration.cc index 0b83c3c53a266..296ffca97663c 100644 --- a/test/integration/integration.cc +++ b/test/integration/integration.cc @@ -46,6 +46,13 @@ std::string normalizeDate(const std::string& s) { IntegrationStreamDecoder::IntegrationStreamDecoder(Event::Dispatcher& dispatcher) : dispatcher_(dispatcher) {} +void IntegrationStreamDecoder::waitForHeaders() { + if (!headers_.get()) { + waiting_for_headers_ = true; + dispatcher_.run(Event::Dispatcher::RunType::Block); + } +} + void IntegrationStreamDecoder::waitForBodyData(uint64_t size) { ASSERT(body_data_waiting_length_ == 0); body_data_waiting_length_ = size; @@ -69,7 +76,7 @@ void IntegrationStreamDecoder::waitForReset() { void IntegrationStreamDecoder::decodeHeaders(Http::HeaderMapPtr&& headers, bool end_stream) { saw_end_stream_ = end_stream; headers_ = std::move(headers); - if (end_stream && waiting_for_end_stream_) { + if ((end_stream && waiting_for_end_stream_) || waiting_for_headers_) { dispatcher_.exit(); } } @@ -696,7 +703,7 @@ void BaseIntegrationTest::testRetry(Http::CodecClient::Type type) { executeActions( {[&]() -> void { codec_client_ = makeHttpConnection(lookupPort("http"), type); }, [&]() -> void { - codec_client_->makeRequestWithBody(Http::TestHeaderMapImpl{{":method", "GET"}, + codec_client_->makeRequestWithBody(Http::TestHeaderMapImpl{{":method", "POST"}, {":path", "/test/long/url"}, {":scheme", "http"}, {":authority", "host"}, @@ -784,10 +791,102 @@ void BaseIntegrationTest::testGrpcRetry() { EXPECT_THAT(*response_->trailers(), HeaderMapEqualRef(&response_trailers)); } }, - // Cleanup both downstream and upstream - [&]() -> void { codec_client_->close(); }, - [&]() -> void { fake_upstream_connection_->close(); }, - [&]() -> void { fake_upstream_connection_->waitForDisconnect(); }}); + [&]() -> void { cleanupUpstreamAndDownstream(); }}); +} + +// Very similar set-up to testRetry but with a 16k request the request will not +// be buffered and the 503 will be returned to the user. +void BaseIntegrationTest::testRetryHittingBufferLimit(Http::CodecClient::Type type) { + executeActions( + {[&]() -> void { + codec_client_ = makeHttpConnection(lookupPort("http_with_buffer_limits"), type); + }, + [&]() -> void { + codec_client_->makeRequestWithBody(Http::TestHeaderMapImpl{{":method", "POST"}, + {":path", "/test/long/url"}, + {":scheme", "http"}, + {":authority", "host"}, + {"x-forwarded-for", "10.0.0.1"}, + {"x-envoy-retry-on", "5xx"}}, + 1024 * 65, *response_); + }, + [&]() -> void { waitForNextUpstreamRequest(); }, + [&]() -> void { + upstream_request_->encodeHeaders(Http::TestHeaderMapImpl{{":status", "503"}}, true); + }, + [&]() -> void { + response_->waitForEndStream(); + EXPECT_TRUE(upstream_request_->complete()); + EXPECT_EQ(66560U, upstream_request_->bodyLength()); + + EXPECT_TRUE(response_->complete()); + EXPECT_STREQ("503", response_->headers().Status()->value().c_str()); + }, + [&]() -> void { cleanupUpstreamAndDownstream(); }}); +} + +// Test hitting the dynamo filter with too many request bytes to buffer. Ensure the connection +// manager sends a 413. +void BaseIntegrationTest::testHittingDecoderFilterLimit(Http::CodecClient::Type type) { + executeActions( + {[&]() -> void { + codec_client_ = makeHttpConnection(lookupPort("dynamo_with_buffer_limits"), type); + }, + [&]() -> void { + codec_client_->makeRequestWithBody(Http::TestHeaderMapImpl{{":method", "POST"}, + {":path", "/dynamo/url"}, + {":scheme", "http"}, + {":authority", "host"}, + {"x-forwarded-for", "10.0.0.1"}, + {"x-envoy-retry-on", "5xx"}}, + 1024 * 65, *response_); + }, + [&]() -> void { + fake_upstream_connection_ = fake_upstreams_[0]->waitForHttpConnection(*dispatcher_); + }, + [&]() -> void { + response_->waitForEndStream(); + EXPECT_TRUE(response_->complete()); + EXPECT_STREQ("413", response_->headers().Status()->value().c_str()); + }, + [&]() -> void { cleanupUpstreamAndDownstream(); }}); +} + +// Test hitting the dynamo filter with too many response bytes to buffer. Given the request headers +// are sent on early, the stream/connection will be reset. +void BaseIntegrationTest::testHittingEncoderFilterLimit(Http::CodecClient::Type type) { + executeActions( + {[&]() -> void { + codec_client_ = makeHttpConnection(lookupPort("dynamo_with_buffer_limits"), type); + }, + [&]() -> void { + auto downstream_request = + &codec_client_->startRequest(Http::TestHeaderMapImpl{{":method", "GET"}, + {":path", "/dynamo/url"}, + {":scheme", "http"}, + {":authority", "host"}}, + *response_); + + Buffer::OwnedImpl data("{\"TableName\":\"locations\"}"); + codec_client_->sendData(*downstream_request, data, true); + }, + [&]() -> void { waitForNextUpstreamRequest(); }, + [&]() -> void { + upstream_request_->encodeHeaders(Http::TestHeaderMapImpl{{":status", "200"}}, false); + // Make sure the headers are received before the body is sent. + response_->waitForHeaders(); + upstream_request_->encodeData(1024 * 65, false); + }, + [&]() -> void { + if (type == Http::CodecClient::Type::HTTP2) { + response_->waitForReset(); + } else { + response_->waitForEndStream(); + } + EXPECT_FALSE(response_->complete()); + EXPECT_STREQ("200", response_->headers().Status()->value().c_str()); + }, + [&]() -> void { cleanupUpstreamAndDownstream(); }}); } void BaseIntegrationTest::testTwoRequests(Http::CodecClient::Type type) { diff --git a/test/integration/integration.h b/test/integration/integration.h index 386c1a4436fe9..51c331cbbb46b 100644 --- a/test/integration/integration.h +++ b/test/integration/integration.h @@ -34,6 +34,7 @@ class IntegrationStreamDecoder : public Http::StreamDecoder, public Http::Stream Http::StreamResetReason reset_reason() { return reset_reason_; } const Http::HeaderMap& headers() { return *headers_; } const Http::HeaderMapPtr& trailers() { return trailers_; } + void waitForHeaders(); void waitForBodyData(uint64_t size); void waitForEndStream(); void waitForReset(); @@ -57,6 +58,7 @@ class IntegrationStreamDecoder : public Http::StreamDecoder, public Http::Stream std::string body_; uint64_t body_data_waiting_length_{}; bool waiting_for_reset_{}; + bool waiting_for_headers_{}; bool saw_reset_{}; Http::StreamResetReason reset_reason_{}; }; @@ -257,7 +259,10 @@ class BaseIntegrationTest : Logger::Loggable { void testMultipleContentLengths(Http::CodecClient::Type type); void testDrainClose(Http::CodecClient::Type type); void testRetry(Http::CodecClient::Type type); + void testRetryHittingBufferLimit(Http::CodecClient::Type type); void testGrpcRetry(); + void testHittingDecoderFilterLimit(Http::CodecClient::Type type); + void testHittingEncoderFilterLimit(Http::CodecClient::Type type); // HTTP/2 client tests. void testDownstreamResetBeforeResponseComplete(); diff --git a/test/integration/integration_test.cc b/test/integration/integration_test.cc index 7e06eac3f84ab..1245f19283143 100644 --- a/test/integration/integration_test.cc +++ b/test/integration/integration_test.cc @@ -66,6 +66,12 @@ TEST_P(IntegrationTest, RouterRequestAndResponseWithGiantBodyBuffer) { 4 * 1024 * 1024, false); } +TEST_P(IntegrationTest, FlowControlOnAndGiantBody) { + testRouterRequestAndResponseWithBody(makeClientConnection(lookupPort("http_with_buffer_limits")), + Http::CodecClient::Type::HTTP1, 1024 * 1024, 1024 * 1024, + false); +} + TEST_P(IntegrationTest, RouterRequestAndResponseLargeHeaderNoBuffer) { testRouterRequestAndResponseWithBody(makeClientConnection(lookupPort("http")), Http::CodecClient::Type::HTTP1, 1024, 512, true); @@ -113,6 +119,63 @@ TEST_P(IntegrationTest, RouterUpstreamResponseBeforeRequestComplete) { TEST_P(IntegrationTest, Retry) { testRetry(Http::CodecClient::Type::HTTP1); } +TEST_P(IntegrationTest, RetryHittingBufferLimit) { + testRetryHittingBufferLimit(Http::CodecClient::Type::HTTP1); +} + +TEST_P(IntegrationTest, HittingDecoderFilterLimit) { + testHittingDecoderFilterLimit(Http::CodecClient::Type::HTTP1); +} + +// Test hitting the bridge filter with too many response bytes to buffer. Given +// the headers are not proxied, the connection manager will send a 500. +TEST_P(IntegrationTest, HittingEncoderFilterLimitBufferingHeaders) { + IntegrationCodecClientPtr codec_client; + FakeHttpConnectionPtr fake_upstream_connection; + IntegrationStreamDecoderPtr response(new IntegrationStreamDecoder(*dispatcher_)); + FakeStreamPtr request; + executeActions({[&]() -> void { + codec_client = makeHttpConnection(lookupPort("bridge_with_buffer_limits"), + Http::CodecClient::Type::HTTP1); + }, + [&]() -> void { + Http::StreamEncoder* request_encoder; + request_encoder = &codec_client->startRequest( + Http::TestHeaderMapImpl{{":method", "POST"}, + {":path", "/test/long/url"}, + {":scheme", "http"}, + {":authority", "host"}, + {"content-type", "application/grpc"}, + {"x-envoy-retry-grpc-on", "cancelled"}}, + *response); + codec_client->sendData(*request_encoder, 1024, true); + }, + [&]() -> void { + fake_upstream_connection = + fake_upstreams_[0]->waitForHttpConnection(*dispatcher_); + }, + [&]() -> void { request = fake_upstream_connection->waitForNewStream(); }, + [&]() -> void { request->waitForEndStream(*dispatcher_); }, + [&]() -> void { + request->encodeHeaders(Http::TestHeaderMapImpl{{":status", "200"}}, false); + // Make sure the headers are received before the body is sent. + request->encodeData(1024 * 65, false); + }, + [&]() -> void { + response->waitForEndStream(); + EXPECT_TRUE(response->complete()); + EXPECT_STREQ("500", response->headers().Status()->value().c_str()); + }, + // Cleanup both downstream and upstream + [&]() -> void { codec_client->close(); }, + [&]() -> void { fake_upstream_connection->close(); }, + [&]() -> void { fake_upstream_connection->waitForDisconnect(); }}); +} + +TEST_P(IntegrationTest, HittingEncoderFilterLimit) { + testHittingEncoderFilterLimit(Http::CodecClient::Type::HTTP1); +} + TEST_P(IntegrationTest, TwoRequests) { testTwoRequests(Http::CodecClient::Type::HTTP1); } TEST_P(IntegrationTest, BadFirstline) { testBadFirstline(); } diff --git a/test/integration/integration_test.h b/test/integration/integration_test.h index 5efcdd9f850fc..1110b839a4631 100644 --- a/test/integration/integration_test.h +++ b/test/integration/integration_test.h @@ -18,7 +18,9 @@ class IntegrationTest : public BaseIntegrationTest, fake_upstreams_.emplace_back(new FakeUpstream(0, FakeHttpConnection::Type::HTTP1, version_)); registerPort("upstream_1", fake_upstreams_.back()->localAddress()->ip()->port()); createTestServer("test/config/integration/server.json", - {"http", "http_forward", "http_buffer", "rds"}); + {"http", "http_forward", "http_with_buffer_limits", + "dynamo_with_buffer_limits", "bridge_with_buffer_limits", "http_buffer", + "tcp_proxy", "rds"}); } /** diff --git a/test/mocks/http/mocks.h b/test/mocks/http/mocks.h index c8993889ae9a9..e9b180477937b 100644 --- a/test/mocks/http/mocks.h +++ b/test/mocks/http/mocks.h @@ -161,6 +161,7 @@ class MockStream : public Stream { MOCK_METHOD1(resetStream, void(StreamResetReason reason)); MOCK_METHOD1(readDisable, void(bool disable)); MOCK_METHOD2(setWriteBufferWatermarks, void(uint32_t, uint32_t)); + MOCK_METHOD0(bufferLimit, uint32_t()); std::list callbacks_{}; }; @@ -251,6 +252,8 @@ class MockStreamDecoderFilterCallbacks : public StreamDecoderFilterCallbacks, MOCK_METHOD0(onDecoderFilterBelowWriteBufferLowWatermark, void()); MOCK_METHOD1(addDownstreamWatermarkCallbacks, void(DownstreamWatermarkCallbacks&)); MOCK_METHOD1(removeDownstreamWatermarkCallbacks, void(DownstreamWatermarkCallbacks&)); + MOCK_METHOD1(setDecoderBufferLimit, void(uint32_t)); + MOCK_METHOD0(decoderBufferLimit, uint32_t()); // Http::StreamDecoderFilterCallbacks void encodeHeaders(HeaderMapPtr&& headers, bool end_stream) override { @@ -259,7 +262,7 @@ class MockStreamDecoderFilterCallbacks : public StreamDecoderFilterCallbacks, void encodeTrailers(HeaderMapPtr&& trailers) override { encodeTrailers_(*trailers); } MOCK_METHOD0(continueDecoding, void()); - MOCK_METHOD1(addDecodedData, void(Buffer::Instance& data)); + MOCK_METHOD2(addDecodedData, void(Buffer::Instance& data, bool streaming)); MOCK_METHOD0(decodingBuffer, const Buffer::Instance*()); MOCK_METHOD2(encodeHeaders_, void(HeaderMap& headers, bool end_stream)); MOCK_METHOD2(encodeData, void(Buffer::Instance& data, bool end_stream)); @@ -287,9 +290,11 @@ class MockStreamEncoderFilterCallbacks : public StreamEncoderFilterCallbacks, MOCK_METHOD0(downstreamAddress, const std::string&()); MOCK_METHOD0(onEncoderFilterAboveWriteBufferHighWatermark, void()); MOCK_METHOD0(onEncoderFilterBelowWriteBufferLowWatermark, void()); + MOCK_METHOD1(setEncoderBufferLimit, void(uint32_t)); + MOCK_METHOD0(encoderBufferLimit, uint32_t()); // Http::StreamEncoderFilterCallbacks - MOCK_METHOD1(addEncodedData, void(Buffer::Instance& data)); + MOCK_METHOD2(addEncodedData, void(Buffer::Instance& data, bool streaming)); MOCK_METHOD0(continueEncoding, void()); MOCK_METHOD0(encodingBuffer, const Buffer::Instance*());