diff --git a/source/common/http/conn_manager_impl.cc b/source/common/http/conn_manager_impl.cc index 1fe100755b71d..63d279529f41b 100644 --- a/source/common/http/conn_manager_impl.cc +++ b/source/common/http/conn_manager_impl.cc @@ -1547,16 +1547,16 @@ bool ConnectionManagerImpl::ActiveStream::verbose() const { void ConnectionManagerImpl::ActiveStream::callHighWatermarkCallbacks() { ++high_watermark_count_; - if (watermark_callbacks_) { - watermark_callbacks_->onAboveWriteBufferHighWatermark(); + for (auto watermark_callbacks : watermark_callbacks_) { + watermark_callbacks->onAboveWriteBufferHighWatermark(); } } void ConnectionManagerImpl::ActiveStream::callLowWatermarkCallbacks() { ASSERT(high_watermark_count_ > 0); --high_watermark_count_; - if (watermark_callbacks_) { - watermark_callbacks_->onBelowWriteBufferLowWatermark(); + for (auto watermark_callbacks : watermark_callbacks_) { + watermark_callbacks->onBelowWriteBufferLowWatermark(); } } @@ -1891,19 +1891,20 @@ void ConnectionManagerImpl::ActiveStreamDecoderFilter:: void ConnectionManagerImpl::ActiveStreamDecoderFilter::addDownstreamWatermarkCallbacks( DownstreamWatermarkCallbacks& watermark_callbacks) { - // This is called exactly once per stream, by the router filter. - // If there's ever a need for another filter to subscribe to watermark callbacks this can be - // turned into a vector. - ASSERT(parent_.watermark_callbacks_ == nullptr); - parent_.watermark_callbacks_ = &watermark_callbacks; + // This is called exactly once per upstream-stream, by the router filter. Therefore, we + // expect the same callbacks to not be registered twice. + ASSERT(std::find(parent_.watermark_callbacks_.begin(), parent_.watermark_callbacks_.end(), + &watermark_callbacks) == parent_.watermark_callbacks_.end()); + parent_.watermark_callbacks_.emplace(parent_.watermark_callbacks_.end(), &watermark_callbacks); for (uint32_t i = 0; i < parent_.high_watermark_count_; ++i) { watermark_callbacks.onAboveWriteBufferHighWatermark(); } } void ConnectionManagerImpl::ActiveStreamDecoderFilter::removeDownstreamWatermarkCallbacks( DownstreamWatermarkCallbacks& watermark_callbacks) { - ASSERT(parent_.watermark_callbacks_ == &watermark_callbacks); - parent_.watermark_callbacks_ = nullptr; + ASSERT(std::find(parent_.watermark_callbacks_.begin(), parent_.watermark_callbacks_.end(), + &watermark_callbacks) != parent_.watermark_callbacks_.end()); + parent_.watermark_callbacks_.remove(&watermark_callbacks); } bool ConnectionManagerImpl::ActiveStreamDecoderFilter::recreateStream() { diff --git a/source/common/http/conn_manager_impl.h b/source/common/http/conn_manager_impl.h index a968d1109f7a4..76a354daa8aa6 100644 --- a/source/common/http/conn_manager_impl.h +++ b/source/common/http/conn_manager_impl.h @@ -501,7 +501,7 @@ class ConnectionManagerImpl : Logger::Loggable, StreamInfo::StreamInfoImpl stream_info_; absl::optional cached_route_; absl::optional cached_cluster_info_; - DownstreamWatermarkCallbacks* watermark_callbacks_{nullptr}; + std::list watermark_callbacks_{}; uint32_t buffer_limit_{0}; uint32_t high_watermark_count_{0}; const std::string* decorated_operation_{nullptr}; diff --git a/source/common/router/BUILD b/source/common/router/BUILD index 498babb17239e..39c941bc3f06d 100644 --- a/source/common/router/BUILD +++ b/source/common/router/BUILD @@ -145,6 +145,7 @@ envoy_cc_library( "//source/common/common:enum_to_int", "//source/common/common:hash_lib", "//source/common/common:hex_lib", + "//source/common/common:linked_object", "//source/common/common:minimal_logger_lib", "//source/common/common:utility_lib", "//source/common/grpc:common_lib", diff --git a/source/common/router/router.cc b/source/common/router/router.cc index e2bd72dedd598..efb31bcecc3a6 100644 --- a/source/common/router/router.cc +++ b/source/common/router/router.cc @@ -181,7 +181,7 @@ FilterUtility::finalTimeout(const RouteEntry& route, Http::HeaderMap& request_he Filter::~Filter() { // Upstream resources should already have been cleaned. - ASSERT(!upstream_request_); + ASSERT(upstream_requests_.empty()); ASSERT(!retry_state_); } @@ -380,8 +380,9 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::HeaderMap& headers, bool e ENVOY_STREAM_LOG(debug, "router decoding headers:\n{}", *callbacks_, headers); - upstream_request_ = std::make_unique(*this, *conn_pool); - upstream_request_->encodeHeaders(end_stream); + UpstreamRequestPtr upstream_request = std::make_unique(*this, *conn_pool); + upstream_request->moveIntoList(std::move(upstream_request), upstream_requests_); + upstream_requests_.front()->encodeHeaders(end_stream); if (end_stream) { onRequestComplete(); } @@ -413,6 +414,7 @@ void Filter::sendNoHealthyUpstreamResponse() { } Http::FilterDataStatus Filter::decodeData(Buffer::Instance& data, bool end_stream) { + ASSERT(upstream_requests_.size() == 1); bool buffering = (retry_state_ && retry_state_->enabled()) || do_shadowing_; if (buffering && buffer_limit_ > 0 && getLength(callbacks_->decodingBuffer()) + data.length() > buffer_limit_) { @@ -427,7 +429,7 @@ Http::FilterDataStatus Filter::decodeData(Buffer::Instance& data, bool end_strea // If we are going to buffer for retries or shadowing, we need to make a copy before encoding // since it's all moves from here on. Buffer::OwnedImpl copy(data); - upstream_request_->encodeData(copy, end_stream); + upstream_requests_.front()->encodeData(copy, end_stream); // If we are potentially going to retry or shadow this request we need to buffer. // This will not cause the connection manager to 413 because before we hit the @@ -436,7 +438,7 @@ Http::FilterDataStatus Filter::decodeData(Buffer::Instance& data, bool end_strea // potentially shadow. callbacks_->addDecodedData(data, true); } else { - upstream_request_->encodeData(data, end_stream); + upstream_requests_.front()->encodeData(data, end_stream); } if (end_stream) { @@ -449,7 +451,8 @@ Http::FilterDataStatus Filter::decodeData(Buffer::Instance& data, bool end_strea Http::FilterTrailersStatus Filter::decodeTrailers(Http::HeaderMap& trailers) { ENVOY_STREAM_LOG(debug, "router decoding trailers:\n{}", *callbacks_, trailers); downstream_trailers_ = &trailers; - upstream_request_->encodeTrailers(trailers); + ASSERT(upstream_requests_.size() == 1); + upstream_requests_.front()->encodeTrailers(trailers); onRequestComplete(); return Http::FilterTrailersStatus::StopIteration; } @@ -463,13 +466,15 @@ void Filter::setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callb } void Filter::cleanup() { - // upstream_request_ is only destroyed in this method (cleanup()) or when we + ASSERT(upstream_requests_.size() <= 1); + // UpstreamRequests are only destroyed in this method (cleanup()) or when we // do a retry (setupRetry()). In the latter case we don't want to save the // upstream timings to the downstream info. - if (upstream_request_) { - callbacks_->streamInfo().setUpstreamTiming(upstream_request_->upstream_timing_); + if (upstream_requests_.size() == 1) { + UpstreamRequestPtr upstream_request = + upstream_requests_.back()->removeFromList(upstream_requests_); + callbacks_->streamInfo().setUpstreamTiming(upstream_request->upstream_timing_); } - upstream_request_.reset(); retry_state_.reset(); if (response_timeout_) { response_timeout_->disableTimer(); @@ -502,7 +507,7 @@ void Filter::onRequestComplete() { downstream_request_complete_time_ = dispatcher.timeSource().monotonicTime(); // Possible that we got an immediate reset. - if (upstream_request_) { + if (upstream_requests_.size() == 1) { // Even if we got an immediate reset, we could still shadow, but that is a riskier change and // seems unnecessary right now. maybeDoShadowing(); @@ -515,8 +520,8 @@ void Filter::onRequestComplete() { } void Filter::onDestroy() { - if (upstream_request_ && !attempting_internal_redirect_with_complete_stream_) { - upstream_request_->resetStream(); + if (upstream_requests_.size() == 1 && !attempting_internal_redirect_with_complete_stream_) { + upstream_requests_.front()->resetStream(); } cleanup(); } @@ -525,35 +530,32 @@ void Filter::onResponseTimeout() { ENVOY_STREAM_LOG(debug, "upstream timeout", *callbacks_); cluster_->stats().upstream_rq_timeout_.inc(); - // It's possible to timeout during a retry backoff delay when we have no upstream request. - if (upstream_request_) { - if (upstream_request_->upstream_host_) { - upstream_request_->upstream_host_->stats().rq_timeout_.inc(); + ASSERT(upstream_requests_.size() <= 1); + if (upstream_requests_.size() == 1) { + if (upstream_requests_.front()->upstream_host_) { + upstream_requests_.front()->upstream_host_->stats().rq_timeout_.inc(); } - upstream_request_->resetStream(); + + updateOutlierDetection(timeout_response_code_, *upstream_requests_.front().get()); + upstream_requests_.front()->resetStream(); } - updateOutlierDetection(timeout_response_code_); onUpstreamTimeoutAbort(StreamInfo::ResponseFlag::UpstreamRequestTimeout); } -void Filter::onPerTryTimeout() { - updateOutlierDetection(timeout_response_code_); +void Filter::onPerTryTimeout(UpstreamRequest& upstream_request) { + updateOutlierDetection(timeout_response_code_, upstream_request); - if (maybeRetryReset(Http::StreamResetReason::LocalReset)) { + if (maybeRetryReset(Http::StreamResetReason::LocalReset, upstream_request)) { return; } onUpstreamTimeoutAbort(StreamInfo::ResponseFlag::UpstreamRequestTimeout); } -void Filter::updateOutlierDetection(Http::Code code) { - Upstream::HostDescriptionConstSharedPtr upstream_host; - if (upstream_request_) { - upstream_host = upstream_request_->upstream_host_; - if (upstream_host) { - upstream_host->outlierDetector().putHttpResponseCode(enumToInt(code)); - } +void Filter::updateOutlierDetection(Http::Code code, UpstreamRequest& upstream_request) { + if (upstream_request.upstream_host_) { + upstream_request.upstream_host_->outlierDetector().putHttpResponseCode(enumToInt(code)); } } @@ -565,11 +567,12 @@ void Filter::onUpstreamTimeoutAbort(StreamInfo::ResponseFlag response_flags) { void Filter::onUpstreamAbort(Http::Code code, StreamInfo::ResponseFlag response_flags, absl::string_view body, bool dropped) { + ASSERT(upstream_requests_.size() <= 1); // If we have not yet sent anything downstream, send a response with an appropriate status code. // Otherwise just reset the ongoing response. if (downstream_response_started_) { - if (upstream_request_ != nullptr && upstream_request_->grpc_rq_success_deferred_) { - upstream_request_->upstream_host_->stats().rq_error_.inc(); + if (upstream_requests_.size() == 1 && upstream_requests_.front()->grpc_rq_success_deferred_) { + upstream_requests_.front()->upstream_host_->stats().rq_error_.inc(); config_.stats_.rq_reset_after_downstream_response_started_.inc(); } // This will destroy any created retry timers. @@ -577,8 +580,8 @@ void Filter::onUpstreamAbort(Http::Code code, StreamInfo::ResponseFlag response_ callbacks_->resetStream(); } else { Upstream::HostDescriptionConstSharedPtr upstream_host; - if (upstream_request_) { - upstream_host = upstream_request_->upstream_host_; + if (upstream_requests_.size() == 1) { + upstream_host = upstream_requests_.front()->upstream_host_; } // This will destroy any created retry timers. @@ -605,16 +608,15 @@ void Filter::onUpstreamAbort(Http::Code code, StreamInfo::ResponseFlag response_ } } -bool Filter::maybeRetryReset(Http::StreamResetReason reset_reason) { +bool Filter::maybeRetryReset(Http::StreamResetReason reset_reason, + UpstreamRequest& upstream_request) { // We don't retry if we already started the response. if (downstream_response_started_ || !retry_state_) { return false; } Upstream::HostDescriptionConstSharedPtr upstream_host; - if (upstream_request_) { - upstream_host = upstream_request_->upstream_host_; - } + upstream_host = upstream_request.upstream_host_; // Notify retry modifiers about the attempted host. if (upstream_host != nullptr) { @@ -638,14 +640,14 @@ bool Filter::maybeRetryReset(Http::StreamResetReason reset_reason) { } void Filter::onUpstreamReset(Http::StreamResetReason reset_reason, - absl::string_view transport_failure_reason) { - ASSERT(upstream_request_); + absl::string_view transport_failure_reason, + UpstreamRequest& upstream_request) { ENVOY_STREAM_LOG(debug, "upstream reset: reset reason {}", *callbacks_, Http::Utility::resetReasonToString(reset_reason)); - updateOutlierDetection(Http::Code::ServiceUnavailable); + updateOutlierDetection(Http::Code::ServiceUnavailable, upstream_request); - if (maybeRetryReset(reset_reason)) { + if (maybeRetryReset(reset_reason, upstream_request)) { return; } @@ -679,7 +681,8 @@ Filter::streamResetReasonToResponseFlag(Http::StreamResetReason reset_reason) { NOT_REACHED_GCOVR_EXCL_LINE; } -void Filter::handleNon5xxResponseHeaders(const Http::HeaderMap& headers, bool end_stream) { +void Filter::handleNon5xxResponseHeaders(const Http::HeaderMap& headers, + UpstreamRequest& upstream_request, bool end_stream) { // We need to defer gRPC success until after we have processed grpc-status in // the trailers. if (grpc_request_) { @@ -687,15 +690,15 @@ void Filter::handleNon5xxResponseHeaders(const Http::HeaderMap& headers, bool en absl::optional grpc_status = Grpc::Common::getGrpcStatus(headers); if (grpc_status && !Http::CodeUtility::is5xx(Grpc::Utility::grpcToHttpStatus(grpc_status.value()))) { - upstream_request_->upstream_host_->stats().rq_success_.inc(); + upstream_request.upstream_host_->stats().rq_success_.inc(); } else { - upstream_request_->upstream_host_->stats().rq_error_.inc(); + upstream_request.upstream_host_->stats().rq_error_.inc(); } } else { - upstream_request_->grpc_rq_success_deferred_ = true; + upstream_request.grpc_rq_success_deferred_ = true; } } else { - upstream_request_->upstream_host_->stats().rq_success_.inc(); + upstream_request.upstream_host_->stats().rq_success_.inc(); } } @@ -713,24 +716,25 @@ void Filter::onUpstream100ContinueHeaders(Http::HeaderMapPtr&& headers) { } void Filter::onUpstreamHeaders(uint64_t response_code, Http::HeaderMapPtr&& headers, - bool end_stream) { + UpstreamRequest& upstream_request, bool end_stream) { + ASSERT(upstream_requests_.size() == 1); ENVOY_STREAM_LOG(debug, "upstream headers complete: end_stream={}", *callbacks_, end_stream); - upstream_request_->upstream_host_->outlierDetector().putHttpResponseCode(response_code); + upstream_request.upstream_host_->outlierDetector().putHttpResponseCode(response_code); if (headers->EnvoyImmediateHealthCheckFail() != nullptr) { - upstream_request_->upstream_host_->healthChecker().setUnhealthy(); + upstream_request.upstream_host_->healthChecker().setUnhealthy(); } if (retry_state_) { // Notify retry modifiers about the attempted host. - retry_state_->onHostAttempted(upstream_request_->upstream_host_); + retry_state_->onHostAttempted(upstream_request.upstream_host_); + // Capture upstream_host since setupRetry() in the following line will clear + // upstream_request. + const auto upstream_host = upstream_request.upstream_host_; const RetryStatus retry_status = retry_state_->shouldRetryHeaders(*headers, [this]() -> void { doRetry(); }); - // Capture upstream_host since setupRetry() in the following line will clear - // upstream_request_. - const auto upstream_host = upstream_request_->upstream_host_; if (retry_status == RetryStatus::Yes && setupRetry(end_stream)) { Http::CodeStats& code_stats = httpContext().codeStats(); code_stats.chargeBasicResponseStat(cluster_->statsScope(), "retry.", @@ -751,7 +755,7 @@ void Filter::onUpstreamHeaders(uint64_t response_code, Http::HeaderMapPtr&& head if (static_cast(response_code) == Http::Code::Found && route_entry_->internalRedirectAction() == InternalRedirectAction::Handle && - setupRedirect(*headers)) { + setupRedirect(*headers, upstream_request)) { return; // If the redirect could not be handled, fail open and let it pass to the // next downstream. @@ -769,12 +773,12 @@ void Filter::onUpstreamHeaders(uint64_t response_code, Http::HeaderMapPtr&& head } } - upstream_request_->upstream_canary_ = + upstream_request.upstream_canary_ = (headers->EnvoyUpstreamCanary() && headers->EnvoyUpstreamCanary()->value() == "true") || - upstream_request_->upstream_host_->canary(); - chargeUpstreamCode(response_code, *headers, upstream_request_->upstream_host_, false); + upstream_request.upstream_host_->canary(); + chargeUpstreamCode(response_code, *headers, upstream_request.upstream_host_, false); if (!Http::CodeUtility::is5xx(response_code)) { - handleNon5xxResponseHeaders(*headers, end_stream); + handleNon5xxResponseHeaders(*headers, upstream_request, end_stream); } // Append routing cookies @@ -789,35 +793,38 @@ void Filter::onUpstreamHeaders(uint64_t response_code, Http::HeaderMapPtr&& head downstream_response_started_ = true; if (end_stream) { - onUpstreamComplete(); + onUpstreamComplete(upstream_request); } callbacks_->encodeHeaders(std::move(headers), end_stream); } -void Filter::onUpstreamData(Buffer::Instance& data, bool end_stream) { +void Filter::onUpstreamData(Buffer::Instance& data, UpstreamRequest& upstream_request, + bool end_stream) { + ASSERT(upstream_requests_.size() == 1); if (end_stream) { // gRPC request termination without trailers is an error. - if (upstream_request_->grpc_rq_success_deferred_) { - upstream_request_->upstream_host_->stats().rq_error_.inc(); + if (upstream_request.grpc_rq_success_deferred_) { + upstream_request.upstream_host_->stats().rq_error_.inc(); } - onUpstreamComplete(); + onUpstreamComplete(upstream_request); } callbacks_->encodeData(data, end_stream); } -void Filter::onUpstreamTrailers(Http::HeaderMapPtr&& trailers) { - if (upstream_request_->grpc_rq_success_deferred_) { +void Filter::onUpstreamTrailers(Http::HeaderMapPtr&& trailers, UpstreamRequest& upstream_request) { + ASSERT(upstream_requests_.size() == 1); + if (upstream_request.grpc_rq_success_deferred_) { absl::optional grpc_status = Grpc::Common::getGrpcStatus(*trailers); if (grpc_status && !Http::CodeUtility::is5xx(Grpc::Utility::grpcToHttpStatus(grpc_status.value()))) { - upstream_request_->upstream_host_->stats().rq_success_.inc(); + upstream_request.upstream_host_->stats().rq_success_.inc(); } else { - upstream_request_->upstream_host_->stats().rq_error_.inc(); + upstream_request.upstream_host_->stats().rq_error_.inc(); } } - onUpstreamComplete(); + onUpstreamComplete(upstream_request); callbacks_->encodeTrailers(std::move(trailers)); } @@ -825,9 +832,9 @@ void Filter::onUpstreamMetadata(Http::MetadataMapPtr&& metadata_map) { callbacks_->encodeMetadata(std::move(metadata_map)); } -void Filter::onUpstreamComplete() { +void Filter::onUpstreamComplete(UpstreamRequest& upstream_request) { if (!downstream_end_stream_) { - upstream_request_->resetStream(); + upstream_request.resetStream(); } if (config_.emit_dynamic_stats_ && !callbacks_->streamInfo().healthCheck() && @@ -836,7 +843,7 @@ void Filter::onUpstreamComplete() { std::chrono::milliseconds response_time = std::chrono::duration_cast( dispatcher.timeSource().monotonicTime() - downstream_request_complete_time_); - upstream_request_->upstream_host_->outlierDetector().putResponseTime(response_time); + upstream_request.upstream_host_->outlierDetector().putResponseTime(response_time); const Http::HeaderEntry* internal_request_header = downstream_headers_->EnvoyInternalRequest(); const bool internal_request = @@ -850,13 +857,13 @@ void Filter::onUpstreamComplete() { cluster_->statsScope(), EMPTY_STRING, response_time, - upstream_request_->upstream_canary_, + upstream_request.upstream_canary_, internal_request, route_entry_->virtualHost().name(), request_vcluster_ ? request_vcluster_->name() : EMPTY_STRING, zone_name, - upstreamZone(upstream_request_->upstream_host_)}; + upstreamZone(upstream_request.upstream_host_)}; code_stats.chargeResponseTiming(info); @@ -865,12 +872,12 @@ void Filter::onUpstreamComplete() { cluster_->statsScope(), alt_stat_prefix_, response_time, - upstream_request_->upstream_canary_, + upstream_request.upstream_canary_, internal_request, EMPTY_STRING, EMPTY_STRING, zone_name, - upstreamZone(upstream_request_->upstream_host_)}; + upstreamZone(upstream_request.upstream_host_)}; code_stats.chargeResponseTiming(info); } @@ -889,16 +896,18 @@ bool Filter::setupRetry(bool end_stream) { return false; } + ASSERT(upstream_requests_.size() == 1); ENVOY_STREAM_LOG(debug, "performing retry", *callbacks_); if (!end_stream) { - upstream_request_->resetStream(); + upstream_requests_.front()->resetStream(); } - upstream_request_.reset(); + upstream_requests_.front()->removeFromList(upstream_requests_); + return true; } -bool Filter::setupRedirect(const Http::HeaderMap& headers) { +bool Filter::setupRedirect(const Http::HeaderMap& headers, UpstreamRequest& upstream_request) { ENVOY_STREAM_LOG(debug, "attempting internal redirect", *callbacks_); const Http::HeaderEntry* location = headers.Location(); @@ -912,7 +921,7 @@ bool Filter::setupRedirect(const Http::HeaderMap& headers) { // completion here and check it in onDestroy. This is annoyingly complicated but is better than // needlessly resetting streams. attempting_internal_redirect_with_complete_stream_ = - upstream_request_->upstream_timing_.last_upstream_rx_byte_received_ && downstream_end_stream_; + upstream_request.upstream_timing_.last_upstream_rx_byte_received_ && downstream_end_stream_; // As with setupRetry, redirects are not supported for streaming requests yet. if (downstream_end_stream_ && @@ -947,19 +956,19 @@ void Filter::doRetry() { } ASSERT(response_timeout_ || timeout_.global_timeout_.count() == 0); - ASSERT(!upstream_request_); - upstream_request_ = std::make_unique(*this, *conn_pool); - upstream_request_->encodeHeaders(!callbacks_->decodingBuffer() && !downstream_trailers_); + UpstreamRequestPtr upstream_request = std::make_unique(*this, *conn_pool); + upstream_request->moveIntoList(std::move(upstream_request), upstream_requests_); + upstream_requests_.front()->encodeHeaders(!callbacks_->decodingBuffer() && !downstream_trailers_); // It's possible we got immediately reset. - if (upstream_request_) { + if (upstream_requests_.size() == 1) { if (callbacks_->decodingBuffer()) { // If we are doing a retry we need to make a copy. Buffer::OwnedImpl copy(*callbacks_->decodingBuffer()); - upstream_request_->encodeData(copy, !downstream_trailers_); + upstream_requests_.front()->encodeData(copy, !downstream_trailers_); } if (downstream_trailers_) { - upstream_request_->encodeTrailers(*downstream_trailers_); + upstream_requests_.front()->encodeTrailers(*downstream_trailers_); } } } @@ -1012,19 +1021,19 @@ void Filter::UpstreamRequest::decodeHeaders(Http::HeaderMapPtr&& headers, bool e upstream_headers_ = headers.get(); const uint64_t response_code = Http::Utility::getResponseStatus(*headers); stream_info_.response_code_ = static_cast(response_code); - parent_.onUpstreamHeaders(response_code, std::move(headers), end_stream); + parent_.onUpstreamHeaders(response_code, std::move(headers), *this, end_stream); } void Filter::UpstreamRequest::decodeData(Buffer::Instance& data, bool end_stream) { maybeEndDecode(end_stream); stream_info_.addBytesReceived(data.length()); - parent_.onUpstreamData(data, end_stream); + parent_.onUpstreamData(data, *this, end_stream); } void Filter::UpstreamRequest::decodeTrailers(Http::HeaderMapPtr&& trailers) { maybeEndDecode(true); upstream_trailers_ = trailers.get(); - parent_.onUpstreamTrailers(std::move(trailers)); + parent_.onUpstreamTrailers(std::move(trailers), *this); } void Filter::UpstreamRequest::decodeMetadata(Http::MetadataMapPtr&& metadata_map) { @@ -1093,7 +1102,7 @@ void Filter::UpstreamRequest::onResetStream(Http::StreamResetReason reason, clearRequestEncoder(); if (!calling_encode_headers_) { stream_info_.setResponseFlag(parent_.streamResetReasonToResponseFlag(reason)); - parent_.onUpstreamReset(reason, transport_failure_reason); + parent_.onUpstreamReset(reason, transport_failure_reason, *this); } else { deferred_reset_reason_ = reason; } @@ -1134,7 +1143,7 @@ void Filter::UpstreamRequest::onPerTryTimeout() { } resetStream(); stream_info_.setResponseFlag(StreamInfo::ResponseFlag::UpstreamRequestTimeout); - parent_.onPerTryTimeout(); + parent_.onPerTryTimeout(*this); } else { ENVOY_STREAM_LOG(debug, "ignored upstream per try timeout due to already started downstream response", @@ -1236,6 +1245,7 @@ void Filter::UpstreamRequest::clearRequestEncoder() { void Filter::UpstreamRequest::DownstreamWatermarkManager::onAboveWriteBufferHighWatermark() { ASSERT(parent_.request_encoder_); + ASSERT(parent_.parent_.upstream_requests_.size() == 1); // The downstream connection is overrun. Pause reads from upstream. parent_.parent_.cluster_->stats().upstream_flow_control_paused_reading_total_.inc(); parent_.request_encoder_->getStream().readDisable(true); @@ -1243,6 +1253,7 @@ void Filter::UpstreamRequest::DownstreamWatermarkManager::onAboveWriteBufferHigh void Filter::UpstreamRequest::DownstreamWatermarkManager::onBelowWriteBufferLowWatermark() { ASSERT(parent_.request_encoder_); + ASSERT(parent_.parent_.upstream_requests_.size() == 1); // The downstream connection has buffer available. Resume reads from upstream. parent_.parent_.cluster_->stats().upstream_flow_control_resumed_reading_total_.inc(); parent_.request_encoder_->getStream().readDisable(false); diff --git a/source/common/router/router.h b/source/common/router/router.h index 8bbd312974e90..bb39d1391a465 100644 --- a/source/common/router/router.h +++ b/source/common/router/router.h @@ -21,6 +21,7 @@ #include "common/buffer/watermark_buffer.h" #include "common/common/hash.h" #include "common/common/hex.h" +#include "common/common/linked_object.h" #include "common/common/logger.h" #include "common/config/well_known_names.h" #include "common/http/utility.h" @@ -268,7 +269,8 @@ class Filter : Logger::Loggable, private: struct UpstreamRequest : public Http::StreamDecoder, public Http::StreamCallbacks, - public Http::ConnectionPool::Callbacks { + public Http::ConnectionPool::Callbacks, + public LinkedObject { UpstreamRequest(Filter& parent, Http::ConnectionPool::Instance& pool); ~UpstreamRequest(); @@ -301,10 +303,12 @@ class Filter : Logger::Loggable, void onBelowWriteBufferLowWatermark() override { enableDataFromDownstream(); } void disableDataFromDownstream() { + ASSERT(parent_.upstream_requests_.size() == 1); parent_.cluster_->stats().upstream_flow_control_backed_up_total_.inc(); parent_.callbacks_->onDecoderFilterAboveWriteBufferHighWatermark(); } void enableDataFromDownstream() { + ASSERT(parent_.upstream_requests_.size() == 1); parent_.cluster_->stats().upstream_flow_control_drained_total_.inc(); parent_.callbacks_->onDecoderFilterBelowWriteBufferLowWatermark(); } @@ -371,8 +375,8 @@ class Filter : Logger::Loggable, Upstream::ResourcePriority priority) PURE; Http::ConnectionPool::Instance* getConnPool(); void maybeDoShadowing(); - bool maybeRetryReset(Http::StreamResetReason reset_reason); - void onPerTryTimeout(); + bool maybeRetryReset(Http::StreamResetReason reset_reason, UpstreamRequest& upstream_request); + void onPerTryTimeout(UpstreamRequest& upstream_request); void onRequestComplete(); void onResponseTimeout(); void onUpstream100ContinueHeaders(Http::HeaderMapPtr&& headers); @@ -383,20 +387,23 @@ class Filter : Logger::Loggable, // downstream if appropriate. void onUpstreamAbort(Http::Code code, StreamInfo::ResponseFlag response_flag, absl::string_view body, bool dropped); - void onUpstreamHeaders(uint64_t response_code, Http::HeaderMapPtr&& headers, bool end_stream); - void onUpstreamData(Buffer::Instance& data, bool end_stream); - void onUpstreamTrailers(Http::HeaderMapPtr&& trailers); + void onUpstreamHeaders(uint64_t response_code, Http::HeaderMapPtr&& headers, + UpstreamRequest& upstream_request, bool end_stream); + void onUpstreamData(Buffer::Instance& data, UpstreamRequest& upstream_request, bool end_stream); + void onUpstreamTrailers(Http::HeaderMapPtr&& trailers, UpstreamRequest& upstream_request); void onUpstreamMetadata(Http::MetadataMapPtr&& metadata_map); - void onUpstreamComplete(); - void onUpstreamReset(Http::StreamResetReason reset_reason, absl::string_view transport_failure); + void onUpstreamComplete(UpstreamRequest& upstream_request); + void onUpstreamReset(Http::StreamResetReason reset_reason, absl::string_view transport_failure, + UpstreamRequest& upstream_request); void sendNoHealthyUpstreamResponse(); bool setupRetry(bool end_stream); - bool setupRedirect(const Http::HeaderMap& headers); - void updateOutlierDetection(Http::Code code); + bool setupRedirect(const Http::HeaderMap& headers, UpstreamRequest& upstream_request); + void updateOutlierDetection(Http::Code code, UpstreamRequest& upstream_request); void doRetry(); // Called immediately after a non-5xx header is received from upstream, performs stats accounting // and handle difference between gRPC and non-gRPC requests. - void handleNon5xxResponseHeaders(const Http::HeaderMap& headers, bool end_stream); + void handleNon5xxResponseHeaders(const Http::HeaderMap& headers, + UpstreamRequest& upstream_request, bool end_stream); TimeSource& timeSource() { return config_.timeSource(); } Http::Context& httpContext() { return config_.http_context_; } @@ -410,7 +417,7 @@ class Filter : Logger::Loggable, Event::TimerPtr response_timeout_; FilterUtility::TimeoutData timeout_; Http::Code timeout_response_code_ = Http::Code::GatewayTimeout; - UpstreamRequestPtr upstream_request_; + std::list upstream_requests_; bool grpc_request_{}; Http::HeaderMap* downstream_headers_{}; Http::HeaderMap* downstream_trailers_{}; diff --git a/test/common/http/conn_manager_impl_test.cc b/test/common/http/conn_manager_impl_test.cc index a282fc88cda50..0176ba270442f 100644 --- a/test/common/http/conn_manager_impl_test.cc +++ b/test/common/http/conn_manager_impl_test.cc @@ -2968,6 +2968,12 @@ TEST_F(HttpConnectionManagerImplTest, UnderlyingConnectionWatermarksPassedOnWith MockDownstreamWatermarkCallbacks callbacks; EXPECT_CALL(callbacks, onAboveWriteBufferHighWatermark()); decoder_filters_[0]->callbacks_->addDownstreamWatermarkCallbacks(callbacks); + + // Ensures that when new callbacks are registered they get invoked immediately + // and the already-registered callbacks do not. + MockDownstreamWatermarkCallbacks callbacks2; + EXPECT_CALL(callbacks2, onAboveWriteBufferHighWatermark()); + decoder_filters_[0]->callbacks_->addDownstreamWatermarkCallbacks(callbacks2); } } @@ -3086,10 +3092,13 @@ TEST_F(HttpConnectionManagerImplTest, HitFilterWatermarkLimits) { MockDownstreamWatermarkCallbacks callbacks; decoder_filters_[0]->callbacks_->addDownstreamWatermarkCallbacks(callbacks); + MockDownstreamWatermarkCallbacks callbacks2; + decoder_filters_[0]->callbacks_->addDownstreamWatermarkCallbacks(callbacks2); // Now overload the buffer with response data. The downstream watermark // callbacks should be called. EXPECT_CALL(callbacks, onAboveWriteBufferHighWatermark()); + EXPECT_CALL(callbacks2, onAboveWriteBufferHighWatermark()); Buffer::OwnedImpl fake_response("A long enough string to go over watermarks"); EXPECT_CALL(*encoder_filters_[1], encodeData(_, false)) .WillOnce(Return(FilterDataStatus::StopIterationAndWatermark)); @@ -3098,6 +3107,7 @@ TEST_F(HttpConnectionManagerImplTest, HitFilterWatermarkLimits) { // Change the limit so the buffered data is below the new watermark. buffer_len = encoder_filters_[1]->callbacks_->encodingBuffer()->length(); EXPECT_CALL(callbacks, onBelowWriteBufferLowWatermark()); + EXPECT_CALL(callbacks2, onBelowWriteBufferLowWatermark()); encoder_filters_[1]->callbacks_->setEncoderBufferLimit((buffer_len + 1) * 2); }