diff --git a/api/envoy/config/cluster/v3/cluster.proto b/api/envoy/config/cluster/v3/cluster.proto index f4e3386cfac9a..90cd990a340a6 100644 --- a/api/envoy/config/cluster/v3/cluster.proto +++ b/api/envoy/config/cluster/v3/cluster.proto @@ -583,11 +583,10 @@ message Cluster { google.protobuf.Duration max_interval = 2 [(validate.rules).duration = {gt {nanos: 1000000}}]; } - // [#not-implemented-hide:] message PreconnectPolicy { // Indicates how many streams (rounded up) can be anticipated per-upstream for each // incoming stream. This is useful for high-QPS or latency-sensitive services. Preconnecting - // will only be done if the upstream is healthy. + // will only be done if the upstream is healthy and the cluster has traffic. // // For example if this is 2, for an incoming HTTP/1.1 stream, 2 connections will be // established, one for the new incoming stream, and one for a presumed follow-up stream. For @@ -605,8 +604,7 @@ message Cluster { // // If this value is not set, or set explicitly to one, Envoy will fetch as many connections // as needed to serve streams in flight. This means in steady state if a connection is torn down, - // a subsequent streams will pay an upstream-rtt latency penalty waiting for streams to be - // preconnected. + // a subsequent streams will pay an upstream-rtt latency penalty waiting for a new connection. // // This is limited somewhat arbitrarily to 3 because preconnecting too aggressively can // harm latency more than the preconnecting helps. @@ -616,24 +614,25 @@ message Cluster { // Indicates how many many streams (rounded up) can be anticipated across a cluster for each // stream, useful for low QPS services. This is currently supported for a subset of // deterministic non-hash-based load-balancing algorithms (weighted round robin, random). - // Unlike per_upstream_preconnect_ratio this preconnects across the upstream instances in a + // Unlike *per_upstream_preconnect_ratio* this preconnects across the upstream instances in a // cluster, doing best effort predictions of what upstream would be picked next and // pre-establishing a connection. // + // Preconnecting will be limited to one preconnect per configured upstream in the cluster and will + // only be done if there are healthy upstreams and the cluster has traffic. + // // For example if preconnecting is set to 2 for a round robin HTTP/2 cluster, on the first // incoming stream, 2 connections will be preconnected - one to the first upstream for this // cluster, one to the second on the assumption there will be a follow-up stream. // - // Preconnecting will be limited to one preconnect per configured upstream in the cluster. - // // If this value is not set, or set explicitly to one, Envoy will fetch as many connections // as needed to serve streams in flight, so during warm up and in steady state if a connection // is closed (and per_upstream_preconnect_ratio is not set), there will be a latency hit for // connection establishment. // // If both this and preconnect_ratio are set, Envoy will make sure both predicted needs are met, - // basically preconnecting max(predictive-preconnect, per-upstream-preconnect), for each upstream. - // TODO(alyssawilk) per LB docs and LB overview docs when unhiding. + // basically preconnecting max(predictive-preconnect, per-upstream-preconnect), for each + // upstream. google.protobuf.DoubleValue predictive_preconnect_ratio = 2 [(validate.rules).double = {lte: 3.0 gte: 1.0}]; } @@ -1028,7 +1027,6 @@ message Cluster { // Configuration to track optional cluster stats. TrackClusterStats track_cluster_stats = 49; - // [#not-implemented-hide:] // Preconnect configuration for this cluster. PreconnectPolicy preconnect_policy = 50; diff --git a/api/envoy/config/cluster/v4alpha/cluster.proto b/api/envoy/config/cluster/v4alpha/cluster.proto index 8b30ab23f265c..2d8aa4369b40f 100644 --- a/api/envoy/config/cluster/v4alpha/cluster.proto +++ b/api/envoy/config/cluster/v4alpha/cluster.proto @@ -588,14 +588,13 @@ message Cluster { google.protobuf.Duration max_interval = 2 [(validate.rules).duration = {gt {nanos: 1000000}}]; } - // [#not-implemented-hide:] message PreconnectPolicy { option (udpa.annotations.versioning).previous_message_type = "envoy.config.cluster.v3.Cluster.PreconnectPolicy"; // Indicates how many streams (rounded up) can be anticipated per-upstream for each // incoming stream. This is useful for high-QPS or latency-sensitive services. Preconnecting - // will only be done if the upstream is healthy. + // will only be done if the upstream is healthy and the cluster has traffic. // // For example if this is 2, for an incoming HTTP/1.1 stream, 2 connections will be // established, one for the new incoming stream, and one for a presumed follow-up stream. For @@ -613,8 +612,7 @@ message Cluster { // // If this value is not set, or set explicitly to one, Envoy will fetch as many connections // as needed to serve streams in flight. This means in steady state if a connection is torn down, - // a subsequent streams will pay an upstream-rtt latency penalty waiting for streams to be - // preconnected. + // a subsequent streams will pay an upstream-rtt latency penalty waiting for a new connection. // // This is limited somewhat arbitrarily to 3 because preconnecting too aggressively can // harm latency more than the preconnecting helps. @@ -624,24 +622,25 @@ message Cluster { // Indicates how many many streams (rounded up) can be anticipated across a cluster for each // stream, useful for low QPS services. This is currently supported for a subset of // deterministic non-hash-based load-balancing algorithms (weighted round robin, random). - // Unlike per_upstream_preconnect_ratio this preconnects across the upstream instances in a + // Unlike *per_upstream_preconnect_ratio* this preconnects across the upstream instances in a // cluster, doing best effort predictions of what upstream would be picked next and // pre-establishing a connection. // + // Preconnecting will be limited to one preconnect per configured upstream in the cluster and will + // only be done if there are healthy upstreams and the cluster has traffic. + // // For example if preconnecting is set to 2 for a round robin HTTP/2 cluster, on the first // incoming stream, 2 connections will be preconnected - one to the first upstream for this // cluster, one to the second on the assumption there will be a follow-up stream. // - // Preconnecting will be limited to one preconnect per configured upstream in the cluster. - // // If this value is not set, or set explicitly to one, Envoy will fetch as many connections // as needed to serve streams in flight, so during warm up and in steady state if a connection // is closed (and per_upstream_preconnect_ratio is not set), there will be a latency hit for // connection establishment. // // If both this and preconnect_ratio are set, Envoy will make sure both predicted needs are met, - // basically preconnecting max(predictive-preconnect, per-upstream-preconnect), for each upstream. - // TODO(alyssawilk) per LB docs and LB overview docs when unhiding. + // basically preconnecting max(predictive-preconnect, per-upstream-preconnect), for each + // upstream. google.protobuf.DoubleValue predictive_preconnect_ratio = 2 [(validate.rules).double = {lte: 3.0 gte: 1.0}]; } @@ -968,7 +967,6 @@ message Cluster { // Configuration to track optional cluster stats. TrackClusterStats track_cluster_stats = 49; - // [#not-implemented-hide:] // Preconnect configuration for this cluster. PreconnectPolicy preconnect_policy = 50; diff --git a/docs/root/version_history/current.rst b/docs/root/version_history/current.rst index 4367cea808065..70805de8eb6cb 100644 --- a/docs/root/version_history/current.rst +++ b/docs/root/version_history/current.rst @@ -31,6 +31,7 @@ Removed Config or Runtime New Features ------------ * access log: added the :ref:`formatters ` extension point for custom formatters (command operators). +* http: added support for :ref:`:ref:`preconnecting `. Preconnecting is off by default, but recommended for clusters serving latency-sensitive traffic, especially if using HTTP/1.1. * tcp_proxy: add support for converting raw TCP streams into HTTP/1.1 CONNECT requests. See :ref:`upgrade documentation ` for details. Deprecated diff --git a/generated_api_shadow/envoy/config/cluster/v3/cluster.proto b/generated_api_shadow/envoy/config/cluster/v3/cluster.proto index cce56c9a1e33f..f3859bf35d221 100644 --- a/generated_api_shadow/envoy/config/cluster/v3/cluster.proto +++ b/generated_api_shadow/envoy/config/cluster/v3/cluster.proto @@ -583,11 +583,10 @@ message Cluster { google.protobuf.Duration max_interval = 2 [(validate.rules).duration = {gt {nanos: 1000000}}]; } - // [#not-implemented-hide:] message PreconnectPolicy { // Indicates how many streams (rounded up) can be anticipated per-upstream for each // incoming stream. This is useful for high-QPS or latency-sensitive services. Preconnecting - // will only be done if the upstream is healthy. + // will only be done if the upstream is healthy and the cluster has traffic. // // For example if this is 2, for an incoming HTTP/1.1 stream, 2 connections will be // established, one for the new incoming stream, and one for a presumed follow-up stream. For @@ -605,8 +604,7 @@ message Cluster { // // If this value is not set, or set explicitly to one, Envoy will fetch as many connections // as needed to serve streams in flight. This means in steady state if a connection is torn down, - // a subsequent streams will pay an upstream-rtt latency penalty waiting for streams to be - // preconnected. + // a subsequent streams will pay an upstream-rtt latency penalty waiting for a new connection. // // This is limited somewhat arbitrarily to 3 because preconnecting too aggressively can // harm latency more than the preconnecting helps. @@ -616,24 +614,25 @@ message Cluster { // Indicates how many many streams (rounded up) can be anticipated across a cluster for each // stream, useful for low QPS services. This is currently supported for a subset of // deterministic non-hash-based load-balancing algorithms (weighted round robin, random). - // Unlike per_upstream_preconnect_ratio this preconnects across the upstream instances in a + // Unlike *per_upstream_preconnect_ratio* this preconnects across the upstream instances in a // cluster, doing best effort predictions of what upstream would be picked next and // pre-establishing a connection. // + // Preconnecting will be limited to one preconnect per configured upstream in the cluster and will + // only be done if there are healthy upstreams and the cluster has traffic. + // // For example if preconnecting is set to 2 for a round robin HTTP/2 cluster, on the first // incoming stream, 2 connections will be preconnected - one to the first upstream for this // cluster, one to the second on the assumption there will be a follow-up stream. // - // Preconnecting will be limited to one preconnect per configured upstream in the cluster. - // // If this value is not set, or set explicitly to one, Envoy will fetch as many connections // as needed to serve streams in flight, so during warm up and in steady state if a connection // is closed (and per_upstream_preconnect_ratio is not set), there will be a latency hit for // connection establishment. // // If both this and preconnect_ratio are set, Envoy will make sure both predicted needs are met, - // basically preconnecting max(predictive-preconnect, per-upstream-preconnect), for each upstream. - // TODO(alyssawilk) per LB docs and LB overview docs when unhiding. + // basically preconnecting max(predictive-preconnect, per-upstream-preconnect), for each + // upstream. google.protobuf.DoubleValue predictive_preconnect_ratio = 2 [(validate.rules).double = {lte: 3.0 gte: 1.0}]; } @@ -1026,7 +1025,6 @@ message Cluster { // Configuration to track optional cluster stats. TrackClusterStats track_cluster_stats = 49; - // [#not-implemented-hide:] // Preconnect configuration for this cluster. PreconnectPolicy preconnect_policy = 50; diff --git a/generated_api_shadow/envoy/config/cluster/v4alpha/cluster.proto b/generated_api_shadow/envoy/config/cluster/v4alpha/cluster.proto index 172be74b46bce..9fb018b4ee69d 100644 --- a/generated_api_shadow/envoy/config/cluster/v4alpha/cluster.proto +++ b/generated_api_shadow/envoy/config/cluster/v4alpha/cluster.proto @@ -589,14 +589,13 @@ message Cluster { google.protobuf.Duration max_interval = 2 [(validate.rules).duration = {gt {nanos: 1000000}}]; } - // [#not-implemented-hide:] message PreconnectPolicy { option (udpa.annotations.versioning).previous_message_type = "envoy.config.cluster.v3.Cluster.PreconnectPolicy"; // Indicates how many streams (rounded up) can be anticipated per-upstream for each // incoming stream. This is useful for high-QPS or latency-sensitive services. Preconnecting - // will only be done if the upstream is healthy. + // will only be done if the upstream is healthy and the cluster has traffic. // // For example if this is 2, for an incoming HTTP/1.1 stream, 2 connections will be // established, one for the new incoming stream, and one for a presumed follow-up stream. For @@ -614,8 +613,7 @@ message Cluster { // // If this value is not set, or set explicitly to one, Envoy will fetch as many connections // as needed to serve streams in flight. This means in steady state if a connection is torn down, - // a subsequent streams will pay an upstream-rtt latency penalty waiting for streams to be - // preconnected. + // a subsequent streams will pay an upstream-rtt latency penalty waiting for a new connection. // // This is limited somewhat arbitrarily to 3 because preconnecting too aggressively can // harm latency more than the preconnecting helps. @@ -625,24 +623,25 @@ message Cluster { // Indicates how many many streams (rounded up) can be anticipated across a cluster for each // stream, useful for low QPS services. This is currently supported for a subset of // deterministic non-hash-based load-balancing algorithms (weighted round robin, random). - // Unlike per_upstream_preconnect_ratio this preconnects across the upstream instances in a + // Unlike *per_upstream_preconnect_ratio* this preconnects across the upstream instances in a // cluster, doing best effort predictions of what upstream would be picked next and // pre-establishing a connection. // + // Preconnecting will be limited to one preconnect per configured upstream in the cluster and will + // only be done if there are healthy upstreams and the cluster has traffic. + // // For example if preconnecting is set to 2 for a round robin HTTP/2 cluster, on the first // incoming stream, 2 connections will be preconnected - one to the first upstream for this // cluster, one to the second on the assumption there will be a follow-up stream. // - // Preconnecting will be limited to one preconnect per configured upstream in the cluster. - // // If this value is not set, or set explicitly to one, Envoy will fetch as many connections // as needed to serve streams in flight, so during warm up and in steady state if a connection // is closed (and per_upstream_preconnect_ratio is not set), there will be a latency hit for // connection establishment. // // If both this and preconnect_ratio are set, Envoy will make sure both predicted needs are met, - // basically preconnecting max(predictive-preconnect, per-upstream-preconnect), for each upstream. - // TODO(alyssawilk) per LB docs and LB overview docs when unhiding. + // basically preconnecting max(predictive-preconnect, per-upstream-preconnect), for each + // upstream. google.protobuf.DoubleValue predictive_preconnect_ratio = 2 [(validate.rules).double = {lte: 3.0 gte: 1.0}]; } @@ -1040,7 +1039,6 @@ message Cluster { // Configuration to track optional cluster stats. TrackClusterStats track_cluster_stats = 49; - // [#not-implemented-hide:] // Preconnect configuration for this cluster. PreconnectPolicy preconnect_policy = 50; diff --git a/source/common/conn_pool/conn_pool_base.cc b/source/common/conn_pool/conn_pool_base.cc index a5df5aadc0fd8..d94aafa389004 100644 --- a/source/common/conn_pool/conn_pool_base.cc +++ b/source/common/conn_pool/conn_pool_base.cc @@ -35,6 +35,28 @@ void ConnPoolImplBase::destructAllConnections() { dispatcher_.clearDeferredDeleteList(); } +bool ConnPoolImplBase::shouldConnect(size_t pending_streams, size_t active_streams, + uint32_t connecting_and_connected_capacity, + float preconnect_ratio, bool anticipate_incoming_stream) { + // This is set to true any time global preconnect is being calculated. + // ClusterManagerImpl::maybePreconnect is called directly before a stream is created, so the + // stream must be anticipated. + // + // Also without this, we would never pre-establish a connection as the first + // connection in a pool because pending/active streams could both be 0. + int anticipated_streams = anticipate_incoming_stream ? 1 : 0; + + // The number of streams we want to be provisioned for is the number of + // pending, active, and anticipated streams times the preconnect ratio. + // The number of streams we are (theoretically) provisioned for is the + // connecting stream capacity plus the number of active streams. + // + // If preconnect ratio is not set, it defaults to 1, and this simplifies to the + // legacy value of pending_streams_.size() > connecting_stream_capacity_ + return (pending_streams + active_streams + anticipated_streams) * preconnect_ratio > + connecting_and_connected_capacity + active_streams; +} + bool ConnPoolImplBase::shouldCreateNewConnection(float global_preconnect_ratio) const { // If the host is not healthy, don't make it do extra work, especially as // upstream selection logic may result in bypassing this upstream entirely. @@ -44,25 +66,25 @@ bool ConnPoolImplBase::shouldCreateNewConnection(float global_preconnect_ratio) return pending_streams_.size() > connecting_stream_capacity_; } - // If global preconnecting is on, and this connection is within the global - // preconnect limit, preconnect. - // We may eventually want to track preconnect_attempts to allow more preconnecting for - // heavily weighted upstreams or sticky picks. - if (global_preconnect_ratio > 1.0 && - ((pending_streams_.size() + 1 + num_active_streams_) * global_preconnect_ratio > - (connecting_stream_capacity_ + num_active_streams_))) { - return true; + // Determine if we are trying to prefetch for global preconnect or local preconnect. + if (global_preconnect_ratio != 0) { + // If global preconnecting is on, and this connection is within the global + // preconnect limit, preconnect. + // For global preconnect, we anticipate an incoming stream to this pool, since it is + // prefetching for the next upcoming stream, which will likely be assigned to this pool. + // We may eventually want to track preconnect_attempts to allow more preconnecting for + // heavily weighted upstreams or sticky picks. + return shouldConnect(pending_streams_.size(), num_active_streams_, connecting_stream_capacity_, + global_preconnect_ratio, true); + } else { + // Ensure this local pool has adequate connections for the given load. + // + // Local preconnect does not need to anticipate a stream. It is called as + // new streams are established or torn down and simply attempts to maintain + // the correct ratio of streams and anticipated capacity. + return shouldConnect(pending_streams_.size(), num_active_streams_, connecting_stream_capacity_, + perUpstreamPreconnectRatio()); } - - // The number of streams we want to be provisioned for is the number of - // pending and active streams times the preconnect ratio. - // The number of streams we are (theoretically) provisioned for is the - // connecting stream capacity plus the number of active streams. - // - // If preconnect ratio is not set, it defaults to 1, and this simplifies to the - // legacy value of pending_streams_.size() > connecting_stream_capacity_ - return (pending_streams_.size() + num_active_streams_) * perUpstreamPreconnectRatio() > - (connecting_stream_capacity_ + num_active_streams_); } float ConnPoolImplBase::perUpstreamPreconnectRatio() const { @@ -189,8 +211,7 @@ ConnectionPool::Cancellable* ConnPoolImplBase::newStream(AttachContext& context) ActiveClient& client = *ready_clients_.front(); ENVOY_CONN_LOG(debug, "using existing connection", client); attachStreamToClient(client, context); - // Even if there's a ready client, we may want to preconnect a new connection - // to handle the next incoming stream. + // Even if there's a ready client, we may want to preconnect to handle the next incoming stream. tryCreateNewConnections(); return nullptr; } @@ -368,7 +389,7 @@ void ConnPoolImplBase::onConnectionEvent(ActiveClient& client, absl::string_view // NOTE: We move the existing pending streams to a temporary list. This is done so that // if retry logic submits a new stream to the pool, we don't fail it inline. purgePendingStreams(client.real_host_description_, failure_reason, reason); - // See if we should preconnect another connection based on active connections. + // See if we should preconnect based on active connections. tryCreateNewConnections(); } diff --git a/source/common/conn_pool/conn_pool_base.h b/source/common/conn_pool/conn_pool_base.h index c2d8ef7da67ef..38730c238e01c 100644 --- a/source/common/conn_pool/conn_pool_base.h +++ b/source/common/conn_pool/conn_pool_base.h @@ -122,6 +122,15 @@ class ConnPoolImplBase : protected Logger::Loggable { return *static_cast(&context); } + // Determines if prefetching is warranted based on the number of streams in + // use, pending streams, anticipated capacity, and preconnect configuration. + // + // If anticipate_incoming_stream is true this assumes a call to newStream is + // pending, which is true for global preconnect. + static bool shouldConnect(size_t pending_streams, size_t active_streams, + uint32_t connecting_and_connected_capacity, float preconnect_ratio, + bool anticipate_incoming_stream = false); + void addDrainedCallbackImpl(Instance::DrainedCb cb); void drainConnectionsImpl(); @@ -158,8 +167,7 @@ class ConnPoolImplBase : protected Logger::Loggable { void checkForDrained(); void scheduleOnUpstreamReady(); ConnectionPool::Cancellable* newStream(AttachContext& context); - // Called if this pool is likely to be picked soon, to determine if it's worth - // preconnecting a connection. + // Called if this pool is likely to be picked soon, to determine if it's worth preconnecting. bool maybePreconnect(float global_preconnect_ratio); virtual ConnectionPool::Cancellable* newPendingStream(AttachContext& context) PURE; @@ -185,6 +193,7 @@ class ConnPoolImplBase : protected Logger::Loggable { bool hasPendingStreams() const { return !pending_streams_.empty(); } protected: + // Creates up to 3 connections, based on the preconnect ratio. virtual void onConnected(Envoy::ConnectionPool::ActiveClient&) {} // Creates up to 3 connections, based on the preconnect ratio. diff --git a/source/common/upstream/cluster_manager_impl.cc b/source/common/upstream/cluster_manager_impl.cc index ebf842ee95ddd..35ac22b481958 100644 --- a/source/common/upstream/cluster_manager_impl.cc +++ b/source/common/upstream/cluster_manager_impl.cc @@ -846,25 +846,32 @@ ThreadLocalCluster* ClusterManagerImpl::getThreadLocalCluster(absl::string_view void ClusterManagerImpl::maybePreconnect( ThreadLocalClusterManagerImpl::ClusterEntry& cluster_entry, + const ClusterConnectivityState& state, std::function pick_preconnect_pool) { - // TODO(alyssawilk) As currently implemented, this will always just preconnect - // one connection ahead of actually needed connections. - // - // Instead we want to track the following metrics across the entire connection - // pool and use the same algorithm we do for per-upstream preconnect: - // ((pending_streams_ + num_active_streams_) * global_preconnect_ratio > - // (connecting_stream_capacity_ + num_active_streams_))) - // and allow multiple preconnects per pick. - // Also cap preconnects such that - // num_unused_preconnect < num hosts - // since if we have more preconnects than hosts, we should consider kicking into - // per-upstream preconnect. - // - // Once we do this, this should loop capped number of times while shouldPreconnect is true. - if (cluster_entry.cluster_info_->peekaheadRatio() > 1.0) { + auto peekahead_ratio = cluster_entry.cluster_info_->peekaheadRatio(); + if (peekahead_ratio <= 1.0) { + return; + } + + // 3 here is arbitrary. Just as in ConnPoolImplBase::tryCreateNewConnections + // we want to limit the work which can be done on any given preconnect attempt. + for (int i = 0; i < 3; ++i) { + // See if adding this one new connection + // would put the cluster over desired capacity. If so, stop preconnecting. + // + // We anticipate the incoming stream here, because maybePreconnect is called + // before a new stream is established. + if (!ConnectionPool::ConnPoolImplBase::shouldConnect( + state.pending_streams_, state.active_streams_, state.connecting_stream_capacity_, + peekahead_ratio, true)) { + return; + } ConnectionPool::Instance* preconnect_pool = pick_preconnect_pool(); - if (preconnect_pool) { - preconnect_pool->maybePreconnect(cluster_entry.cluster_info_->peekaheadRatio()); + if (!preconnect_pool || !preconnect_pool->maybePreconnect(peekahead_ratio)) { + // Given that the next preconnect pick may be entirely different, we could + // opt to try again even if the first preconnect fails. Err on the side of + // caution and wait for the next attempt. + return; } } } @@ -882,10 +889,9 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::httpConnPool( // performed here in anticipation of the new stream. // TODO(alyssawilk) refactor to have one function call and return a pair, so this invariant is // code-enforced. - maybePreconnect(*this, [this, &priority, &protocol, &context]() { + maybePreconnect(*this, parent_.cluster_manager_state_, [this, &priority, &protocol, &context]() { return connPool(priority, protocol, context, true); }); - return ret; } @@ -901,7 +907,7 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::tcpConnPool( // TODO(alyssawilk) refactor to have one function call and return a pair, so this invariant is // code-enforced. // Now see if another host should be preconnected. - maybePreconnect(*this, + maybePreconnect(*this, parent_.cluster_manager_state_, [this, &priority, &context]() { return tcpConnPool(priority, context, true); }); return ret; @@ -1246,7 +1252,7 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::onHostHealthFailure( if (host->cluster().features() & ClusterInfo::Features::CLOSE_CONNECTIONS_ON_HOST_HEALTH_FAILURE) { - // Close non connection pool TCP connections obtained from tcpConnForCluster() + // Close non connection pool TCP connections obtained from tcpConn() // // TODO(jono): The only remaining user of the non-pooled connections seems to be the statsd // TCP client. Perhaps it could be rewritten to use a connection pool, and this code deleted. @@ -1357,8 +1363,10 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::connPool( LoadBalancerContext* context, bool peek) { HostConstSharedPtr host = (peek ? lb_->peekAnotherHost(context) : lb_->chooseHost(context)); if (!host) { - ENVOY_LOG(debug, "no healthy host for HTTP connection pool"); - cluster_info_->stats().upstream_cx_none_healthy_.inc(); + if (!peek) { + ENVOY_LOG(debug, "no healthy host for HTTP connection pool"); + cluster_info_->stats().upstream_cx_none_healthy_.inc(); + } return nullptr; } @@ -1426,8 +1434,10 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::tcpConnPool( ResourcePriority priority, LoadBalancerContext* context, bool peek) { HostConstSharedPtr host = (peek ? lb_->peekAnotherHost(context) : lb_->chooseHost(context)); if (!host) { - ENVOY_LOG(debug, "no healthy host for TCP connection pool"); - cluster_info_->stats().upstream_cx_none_healthy_.inc(); + if (!peek) { + ENVOY_LOG(debug, "no healthy host for TCP connection pool"); + cluster_info_->stats().upstream_cx_none_healthy_.inc(); + } return nullptr; } diff --git a/source/common/upstream/cluster_manager_impl.h b/source/common/upstream/cluster_manager_impl.h index 3cd527ec55c12..98d04ac2d7345 100644 --- a/source/common/upstream/cluster_manager_impl.h +++ b/source/common/upstream/cluster_manager_impl.h @@ -563,6 +563,7 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable preconnect_pool); ClusterManagerFactory& factory_; diff --git a/source/common/upstream/load_balancer_impl.cc b/source/common/upstream/load_balancer_impl.cc index 1297da36f1630..9e87d2edbfa21 100644 --- a/source/common/upstream/load_balancer_impl.cc +++ b/source/common/upstream/load_balancer_impl.cc @@ -22,6 +22,12 @@ static const std::string RuntimeZoneEnabled = "upstream.zone_routing.enabled"; static const std::string RuntimeMinClusterSize = "upstream.zone_routing.min_cluster_size"; static const std::string RuntimePanicThreshold = "upstream.healthy_panic_threshold"; +bool tooManyPreconnects(size_t num_preconnect_picks, uint32_t healthy_hosts) { + // Currently we only allow the number of preconnected connections to equal the + // number of healthy hosts. + return num_preconnect_picks >= healthy_hosts; +} + // Distributes load between priorities based on the per priority availability and the normalized // total availability. Load is assigned to each priority according to how available each priority is // adjusted for the normalized total availability. @@ -108,16 +114,16 @@ LoadBalancerBase::LoadBalancerBase( priority_set_(priority_set) { for (auto& host_set : priority_set_.hostSetsPerPriority()) { recalculatePerPriorityState(host_set->priority(), priority_set_, per_priority_load_, - per_priority_health_, per_priority_degraded_); + per_priority_health_, per_priority_degraded_, total_healthy_hosts_); } // Recalculate panic mode for all levels. recalculatePerPriorityPanic(); - priority_set_.addPriorityUpdateCb( - [this](uint32_t priority, const HostVector&, const HostVector&) -> void { - recalculatePerPriorityState(priority, priority_set_, per_priority_load_, - per_priority_health_, per_priority_degraded_); - }); + priority_set_.addPriorityUpdateCb([this](uint32_t priority, const HostVector&, + const HostVector&) -> void { + recalculatePerPriorityState(priority, priority_set_, per_priority_load_, per_priority_health_, + per_priority_degraded_, total_healthy_hosts_); + }); priority_set_.addPriorityUpdateCb( [this](uint32_t priority, const HostVector&, const HostVector&) -> void { @@ -146,11 +152,13 @@ void LoadBalancerBase::recalculatePerPriorityState(uint32_t priority, const PrioritySet& priority_set, HealthyAndDegradedLoad& per_priority_load, HealthyAvailability& per_priority_health, - DegradedAvailability& per_priority_degraded) { + DegradedAvailability& per_priority_degraded, + uint32_t& total_healthy_hosts) { per_priority_load.healthy_priority_load_.get().resize(priority_set.hostSetsPerPriority().size()); per_priority_load.degraded_priority_load_.get().resize(priority_set.hostSetsPerPriority().size()); per_priority_health.get().resize(priority_set.hostSetsPerPriority().size()); per_priority_degraded.get().resize(priority_set.hostSetsPerPriority().size()); + total_healthy_hosts = 0; // Determine the health of the newly modified priority level. // Health ranges from 0-100, and is the ratio of healthy/degraded hosts to total hosts, modified @@ -232,6 +240,10 @@ void LoadBalancerBase::recalculatePerPriorityState(uint32_t priority, per_priority_load.healthy_priority_load_.get().end(), 0) + std::accumulate(per_priority_load.degraded_priority_load_.get().begin(), per_priority_load.degraded_priority_load_.get().end(), 0)); + + for (auto& host_set : priority_set.hostSetsPerPriority()) { + total_healthy_hosts += host_set->healthyHosts().size(); + } } // Method iterates through priority levels and turns on/off panic mode. @@ -774,6 +786,10 @@ void EdfLoadBalancerBase::refresh(uint32_t priority) { } HostConstSharedPtr EdfLoadBalancerBase::peekAnotherHost(LoadBalancerContext* context) { + if (tooManyPreconnects(stashed_random_.size(), total_healthy_hosts_)) { + return nullptr; + } + const absl::optional hosts_source = hostSourceToUse(context, random(true)); if (!hosts_source) { return nullptr; @@ -859,6 +875,9 @@ HostConstSharedPtr LeastRequestLoadBalancer::unweightedHostPick(const HostVector } HostConstSharedPtr RandomLoadBalancer::peekAnotherHost(LoadBalancerContext* context) { + if (tooManyPreconnects(stashed_random_.size(), total_healthy_hosts_)) { + return nullptr; + } return peekOrChoose(context, true); } diff --git a/source/common/upstream/load_balancer_impl.h b/source/common/upstream/load_balancer_impl.h index fab367067255c..add54d9e9dac7 100644 --- a/source/common/upstream/load_balancer_impl.h +++ b/source/common/upstream/load_balancer_impl.h @@ -121,7 +121,8 @@ class LoadBalancerBase : public LoadBalancer { void static recalculatePerPriorityState(uint32_t priority, const PrioritySet& priority_set, HealthyAndDegradedLoad& priority_load, HealthyAvailability& per_priority_health, - DegradedAvailability& per_priority_degraded); + DegradedAvailability& per_priority_degraded, + uint32_t& total_healthy_hosts); void recalculatePerPriorityPanic(); protected: @@ -154,6 +155,8 @@ class LoadBalancerBase : public LoadBalancer { DegradedAvailability per_priority_degraded_; // Levels which are in panic std::vector per_priority_panic_; + // The total count of healthy hosts across all priority levels. + uint32_t total_healthy_hosts_; }; class LoadBalancerContextBase : public LoadBalancerContext { diff --git a/source/extensions/retry/priority/previous_priorities/previous_priorities.h b/source/extensions/retry/priority/previous_priorities/previous_priorities.h index 05e4f3db37a24..f626f664a04ca 100644 --- a/source/extensions/retry/priority/previous_priorities/previous_priorities.h +++ b/source/extensions/retry/priority/previous_priorities/previous_priorities.h @@ -29,7 +29,8 @@ class PreviousPrioritiesRetryPriority : public Upstream::RetryPriority { void recalculatePerPriorityState(uint32_t priority, const Upstream::PrioritySet& priority_set) { // Recalculate health and priority the same way the load balancer does it. Upstream::LoadBalancerBase::recalculatePerPriorityState( - priority, priority_set, per_priority_load_, per_priority_health_, per_priority_degraded_); + priority, priority_set, per_priority_load_, per_priority_health_, per_priority_degraded_, + total_healthy_hosts_); } uint32_t adjustedAvailability(std::vector& per_priority_health, @@ -47,6 +48,7 @@ class PreviousPrioritiesRetryPriority : public Upstream::RetryPriority { Upstream::HealthyAndDegradedLoad per_priority_load_; Upstream::HealthyAvailability per_priority_health_; Upstream::DegradedAvailability per_priority_degraded_; + uint32_t total_healthy_hosts_; }; } // namespace Priority diff --git a/test/common/upstream/cluster_manager_impl_test.cc b/test/common/upstream/cluster_manager_impl_test.cc index 1e6a9175c5011..83c403131fb82 100644 --- a/test/common/upstream/cluster_manager_impl_test.cc +++ b/test/common/upstream/cluster_manager_impl_test.cc @@ -28,6 +28,7 @@ using ::testing::DoAll; using ::testing::Eq; using ::testing::InSequence; using ::testing::Invoke; +using ::testing::InvokeWithoutArgs; using ::testing::Mock; using ::testing::NiceMock; using ::testing::Return; @@ -4220,8 +4221,10 @@ class PreconnectTest : public ClusterManagerImplTest { // Set up the HostSet. host1_ = makeTestHost(cluster_->info(), "tcp://127.0.0.1:80", time_system_); host2_ = makeTestHost(cluster_->info(), "tcp://127.0.0.1:80", time_system_); + host3_ = makeTestHost(cluster_->info(), "tcp://127.0.0.1:80", time_system_); + host4_ = makeTestHost(cluster_->info(), "tcp://127.0.0.1:80", time_system_); - HostVector hosts{host1_, host2_}; + HostVector hosts{host1_, host2_, host3_, host4_}; auto hosts_ptr = std::make_shared(hosts); // Sending non-mergeable updates. @@ -4233,6 +4236,8 @@ class PreconnectTest : public ClusterManagerImplTest { Cluster* cluster_{}; HostSharedPtr host1_; HostSharedPtr host2_; + HostSharedPtr host3_; + HostSharedPtr host4_; }; TEST_F(PreconnectTest, PreconnectOff) { @@ -4270,6 +4275,96 @@ TEST_F(PreconnectTest, PreconnectOn) { ->tcpConnPool(ResourcePriority::Default, nullptr); } +TEST_F(PreconnectTest, PreconnectHighHttp) { + // With preconnect set to 3, the first request will kick off 3 preconnect attempts. + initialize(3); + int http_preconnect = 0; + EXPECT_CALL(factory_, allocateConnPool_(_, _, _, _)) + .Times(4) + .WillRepeatedly(InvokeWithoutArgs([&]() -> Http::ConnectionPool::Instance* { + auto* ret = new NiceMock(); + ON_CALL(*ret, maybePreconnect(_)).WillByDefault(InvokeWithoutArgs([&]() -> bool { + ++http_preconnect; + return true; + })); + return ret; + })); + cluster_manager_->getThreadLocalCluster("cluster_1") + ->httpConnPool(ResourcePriority::Default, Http::Protocol::Http11, nullptr); + // Expect preconnect to be called 3 times across the four hosts. + EXPECT_EQ(3, http_preconnect); +} + +TEST_F(PreconnectTest, PreconnectHighTcp) { + // With preconnect set to 3, the first request will kick off 3 preconnect attempts. + initialize(3); + int tcp_preconnect = 0; + EXPECT_CALL(factory_, allocateTcpConnPool_(_)) + .Times(4) + .WillRepeatedly(InvokeWithoutArgs([&]() -> Tcp::ConnectionPool::Instance* { + auto* ret = new NiceMock(); + ON_CALL(*ret, maybePreconnect(_)).WillByDefault(InvokeWithoutArgs([&]() -> bool { + ++tcp_preconnect; + return true; + })); + return ret; + })); + cluster_manager_->getThreadLocalCluster("cluster_1") + ->tcpConnPool(ResourcePriority::Default, nullptr); + // Expect preconnect to be called 3 times across the four hosts. + EXPECT_EQ(3, tcp_preconnect); +} + +TEST_F(PreconnectTest, PreconnectCappedAt3) { + // With preconnect set to 20, no more than 3 connections will be preconnected. + initialize(20); + int http_preconnect = 0; + EXPECT_CALL(factory_, allocateConnPool_(_, _, _, _)) + .Times(4) + .WillRepeatedly(InvokeWithoutArgs([&]() -> Http::ConnectionPool::Instance* { + auto* ret = new NiceMock(); + ON_CALL(*ret, maybePreconnect(_)).WillByDefault(InvokeWithoutArgs([&]() -> bool { + ++http_preconnect; + return true; + })); + return ret; + })); + cluster_manager_->getThreadLocalCluster("cluster_1") + ->httpConnPool(ResourcePriority::Default, Http::Protocol::Http11, nullptr); + // Expect preconnect to be called 3 times across the four hosts. + EXPECT_EQ(3, http_preconnect); + + // A subsequent call to get a connection will consume one of the preconnected + // connections, leaving two in queue, and kick off 2 more. This time we won't + // do the full 3 as the number of outstanding preconnects is limited by the + // number of healthy hosts. + http_preconnect = 0; + cluster_manager_->getThreadLocalCluster("cluster_1") + ->httpConnPool(ResourcePriority::Default, Http::Protocol::Http11, nullptr); + EXPECT_EQ(2, http_preconnect); +} + +TEST_F(PreconnectTest, PreconnectCappedByMaybePreconnect) { + // Set preconnect high, and verify preconnecting stops when maybePreconnect returns false. + initialize(20); + int http_preconnect_calls = 0; + EXPECT_CALL(factory_, allocateConnPool_(_, _, _, _)) + .Times(2) + .WillRepeatedly(InvokeWithoutArgs([&]() -> Http::ConnectionPool::Instance* { + auto* ret = new NiceMock(); + ON_CALL(*ret, maybePreconnect(_)).WillByDefault(InvokeWithoutArgs([&]() -> bool { + ++http_preconnect_calls; + // Force maybe preconnect to fail. + return false; + })); + return ret; + })); + cluster_manager_->getThreadLocalCluster("cluster_1") + ->httpConnPool(ResourcePriority::Default, Http::Protocol::Http11, nullptr); + // Expect preconnect to be called once and then preconnecting is stopped. + EXPECT_EQ(1, http_preconnect_calls); +} + } // namespace } // namespace Upstream } // namespace Envoy diff --git a/test/common/upstream/load_balancer_impl_test.cc b/test/common/upstream/load_balancer_impl_test.cc index 63f981cd7ab69..6135f6f9534a2 100644 --- a/test/common/upstream/load_balancer_impl_test.cc +++ b/test/common/upstream/load_balancer_impl_test.cc @@ -596,7 +596,8 @@ TEST_P(FailoverTest, BasicDegradedHosts) { host_set_.degraded_hosts_ = host_set_.hosts_; failover_host_set_.hosts_ = failover_host_set_.healthy_hosts_; init(false); - EXPECT_EQ(host_set_.degraded_hosts_[0], lb_->peekAnotherHost(nullptr)); + // We don't preconnect degraded hosts. + EXPECT_EQ(nullptr, lb_->peekAnotherHost(nullptr)); EXPECT_EQ(host_set_.degraded_hosts_[0], lb_->chooseHost(nullptr)); } @@ -802,7 +803,8 @@ TEST_P(RoundRobinLoadBalancerTest, Normal) { hostSet().healthy_hosts_.push_back(makeTestHost(info_, "tcp://127.0.0.1:82", simTime())); hostSet().hosts_.push_back(hostSet().healthy_hosts_.back()); hostSet().runCallbacks({hostSet().healthy_hosts_.back()}, {}); - peekThenPick({2, 0, 1, 2}); + peekThenPick({2, 0, 1}); + peekThenPick({2}); // Now peek a few extra to push the index forward, alter the host set, and // make sure the index is restored to 0. @@ -812,7 +814,8 @@ TEST_P(RoundRobinLoadBalancerTest, Normal) { hostSet().healthy_hosts_.push_back(makeTestHost(info_, "tcp://127.0.0.1:83", simTime())); hostSet().hosts_.push_back(hostSet().healthy_hosts_.back()); hostSet().runCallbacks({hostSet().healthy_hosts_.back()}, {hostSet().healthy_hosts_.front()}); - peekThenPick({1, 2, 3}); + EXPECT_EQ(hostSet().healthy_hosts_[1], lb_->chooseHost(nullptr)); + peekThenPick({2, 3}); } // Validate that the RNG seed influences pick order. diff --git a/test/integration/http_integration.cc b/test/integration/http_integration.cc index 0553b16481a60..f3044030a16b3 100644 --- a/test/integration/http_integration.cc +++ b/test/integration/http_integration.cc @@ -369,31 +369,39 @@ void HttpIntegrationTest::verifyResponse(IntegrationStreamDecoderPtr response, EXPECT_EQ(response->body(), expected_body); } +absl::optional HttpIntegrationTest::waitForNextUpstreamConnection( + const std::vector& upstream_indices, + std::chrono::milliseconds connection_wait_timeout, + FakeHttpConnectionPtr& fake_upstream_connection) { + AssertionResult result = AssertionFailure(); + int upstream_index = 0; + Event::TestTimeSystem::RealTimeBound bound(connection_wait_timeout); + // Loop over the upstreams until the call times out or an upstream request is received. + while (!result) { + upstream_index = upstream_index % upstream_indices.size(); + result = fake_upstreams_[upstream_indices[upstream_index]]->waitForHttpConnection( + *dispatcher_, fake_upstream_connection, std::chrono::milliseconds(5), + max_request_headers_kb_, max_request_headers_count_); + if (result) { + return upstream_index; + } else if (!bound.withinBound()) { + RELEASE_ASSERT(0, "Timed out waiting for new connection."); + break; + } + ++upstream_index; + } + RELEASE_ASSERT(result, result.message()); + return {}; +} + absl::optional HttpIntegrationTest::waitForNextUpstreamRequest(const std::vector& upstream_indices, std::chrono::milliseconds connection_wait_timeout) { absl::optional upstream_with_request; // If there is no upstream connection, wait for it to be established. if (!fake_upstream_connection_) { - AssertionResult result = AssertionFailure(); - int upstream_index = 0; - Event::TestTimeSystem::RealTimeBound bound(connection_wait_timeout); - // Loop over the upstreams until the call times out or an upstream request is received. - while (!result) { - upstream_index = upstream_index % upstream_indices.size(); - result = fake_upstreams_[upstream_indices[upstream_index]]->waitForHttpConnection( - *dispatcher_, fake_upstream_connection_, std::chrono::milliseconds(5), - max_request_headers_kb_, max_request_headers_count_); - if (result) { - upstream_with_request = upstream_index; - break; - } else if (!bound.withinBound()) { - result = (AssertionFailure() << "Timed out waiting for new connection."); - break; - } - ++upstream_index; - } - RELEASE_ASSERT(result, result.message()); + upstream_with_request = waitForNextUpstreamConnection(upstream_indices, connection_wait_timeout, + fake_upstream_connection_); } // Wait for the next stream on the upstream connection. AssertionResult result = diff --git a/test/integration/http_integration.h b/test/integration/http_integration.h index 13b91528e36ce..5ee7de6579978 100644 --- a/test/integration/http_integration.h +++ b/test/integration/http_integration.h @@ -149,6 +149,11 @@ class HttpIntegrationTest : public BaseIntegrationTest { uint64_t upstream_index = 0, std::chrono::milliseconds connection_wait_timeout = TestUtility::DefaultTimeout); + absl::optional + waitForNextUpstreamConnection(const std::vector& upstream_indices, + std::chrono::milliseconds connection_wait_timeout, + FakeHttpConnectionPtr& fake_upstream_connection); + // Close |codec_client_| and |fake_upstream_connection_| cleanly. void cleanupUpstreamAndDownstream(); diff --git a/test/integration/integration_test.cc b/test/integration/integration_test.cc index 348d5f22fc099..d488538bc56d5 100644 --- a/test/integration/integration_test.cc +++ b/test/integration/integration_test.cc @@ -1800,4 +1800,117 @@ TEST_P(IntegrationTest, ConnectionIsTerminatedIfHCMStreamErrorIsFalseAndOverride EXPECT_EQ("400", response->headers().getStatusValue()); } +TEST_P(IntegrationTest, Preconnect) { + config_helper_.addConfigModifier([&](envoy::config::bootstrap::v3::Bootstrap& bootstrap) -> void { + bootstrap.mutable_static_resources() + ->mutable_clusters(0) + ->mutable_preconnect_policy() + ->mutable_predictive_preconnect_ratio() + ->set_value(1.5); + }); + config_helper_.addConfigModifier([this](envoy::config::bootstrap::v3::Bootstrap& bootstrap) { + auto* cluster = bootstrap.mutable_static_resources()->mutable_clusters(0); + auto* load_assignment = cluster->mutable_load_assignment(); + load_assignment->clear_endpoints(); + for (int i = 0; i < 5; ++i) { + auto locality = load_assignment->add_endpoints(); + locality->add_lb_endpoints()->mutable_endpoint()->MergeFrom( + ConfigHelper::buildEndpoint(Network::Test::getLoopbackAddressString(version_))); + } + }); + + setUpstreamCount(5); + initialize(); + + std::list clients; + std::list encoders; + std::list responses; + std::vector fake_connections{15}; + + int upstream_index = 0; + for (uint32_t i = 0; i < 10; ++i) { + // Start a new request. + clients.push_back(makeHttpConnection(lookupPort("http"))); + auto encoder_decoder = clients.back()->startRequest(default_request_headers_); + encoders.push_back(&encoder_decoder.first); + responses.push_back(std::move(encoder_decoder.second)); + + // For each HTTP request, a new connection will be established, as none of + // the streams are closed so no connections can be reused. + waitForNextUpstreamConnection(std::vector({0, 1, 2, 3, 4}), + TestUtility::DefaultTimeout, fake_connections[upstream_index]); + ++upstream_index; + + // For every other connection, an extra connection should be preconnected. + if (i % 2 == 0) { + waitForNextUpstreamConnection(std::vector({0, 1, 2, 3, 4}), + TestUtility::DefaultTimeout, fake_connections[upstream_index]); + ++upstream_index; + } + } + + // Clean up. + while (!clients.empty()) { + clients.front()->close(); + clients.pop_front(); + } +} + +TEST_P(IntegrationTest, RandomPreconnect) { + config_helper_.addConfigModifier([&](envoy::config::bootstrap::v3::Bootstrap& bootstrap) -> void { + bootstrap.mutable_static_resources() + ->mutable_clusters(0) + ->mutable_preconnect_policy() + ->mutable_predictive_preconnect_ratio() + ->set_value(1.5); + }); + config_helper_.addConfigModifier([this](envoy::config::bootstrap::v3::Bootstrap& bootstrap) { + auto* cluster = bootstrap.mutable_static_resources()->mutable_clusters(0); + auto* load_assignment = cluster->mutable_load_assignment(); + load_assignment->clear_endpoints(); + for (int i = 0; i < 5; ++i) { + auto locality = load_assignment->add_endpoints(); + locality->add_lb_endpoints()->mutable_endpoint()->MergeFrom( + ConfigHelper::buildEndpoint(Network::Test::getLoopbackAddressString(version_))); + } + }); + + setUpstreamCount(5); + TestRandomGenerator rand; + autonomous_upstream_ = true; + initialize(); + + std::list clients; + std::list encoders; + std::list responses; + const uint32_t num_requests = 50; + + for (uint32_t i = 0; i < num_requests; ++i) { + if (rand.random() % 5 <= 3) { // Bias slightly towards more connections + // Start a new request. + clients.push_back(makeHttpConnection(lookupPort("http"))); + auto encoder_decoder = clients.back()->startRequest(default_request_headers_); + encoders.push_back(&encoder_decoder.first); + responses.push_back(std::move(encoder_decoder.second)); + } else if (!clients.empty()) { + // Finish up a request. + clients.front()->sendData(*encoders.front(), 0, true); + encoders.pop_front(); + responses.front()->waitForEndStream(); + responses.pop_front(); + clients.front()->close(); + clients.pop_front(); + } + } + // Clean up. + while (!clients.empty()) { + clients.front()->sendData(*encoders.front(), 0, true); + encoders.pop_front(); + responses.front()->waitForEndStream(); + responses.pop_front(); + clients.front()->close(); + clients.pop_front(); + } +} + } // namespace Envoy