diff --git a/.azure-pipelines/cve_scan.yml b/.azure-pipelines/cve_scan.yml index cbd024add1048..7c951df66f689 100644 --- a/.azure-pipelines/cve_scan.yml +++ b/.azure-pipelines/cve_scan.yml @@ -4,14 +4,13 @@ trigger: none pr: none -# This appears to be broken right now so disabling until it is fixed. -# schedules: -# - cron: "0 * * * *" -# displayName: Hourly CVE scan -# branches: -# include: -# - main -# always: true +schedules: +- cron: "0 * * * *" + displayName: Hourly CVE scan + branches: + include: + - main + always: true pool: vmImage: "ubuntu-18.04" diff --git a/api/buf.yaml b/api/buf.yaml index 781f01f972ced..6a6cdf05c0ff9 100644 --- a/api/buf.yaml +++ b/api/buf.yaml @@ -1,6 +1,6 @@ version: v1beta1 deps: - - buf.build/googleapis/googleapis + - buf.build/googleapis/googleapis:d1a849b8f8304950832335723096e954 - buf.build/beta/opencensus - buf.build/beta/prometheus - buf.build/beta/opentelemetry diff --git a/api/envoy/extensions/filters/http/ext_proc/v3alpha/ext_proc.proto b/api/envoy/extensions/filters/http/ext_proc/v3alpha/ext_proc.proto index f60865c62315e..37560feba3c27 100644 --- a/api/envoy/extensions/filters/http/ext_proc/v3alpha/ext_proc.proto +++ b/api/envoy/extensions/filters/http/ext_proc/v3alpha/ext_proc.proto @@ -150,7 +150,6 @@ message ExternalProcessor { string stat_prefix = 8; } -// [#not-implemented-hide:] // Extra settings that may be added to per-route configuration for a // virtual host or cluster. message ExtProcPerRoute { @@ -161,23 +160,27 @@ message ExtProcPerRoute { // If disabled is specified in multiple per-filter-configs, the most specific one will be used. bool disabled = 1 [(validate.rules).bool = {const: true}]; - // Override aspects of the configuration for this route + // Override aspects of the configuration for this route. A set of + // overrides in a more specific configuration will override a "disabled" + // flag set in a less-specific one. ExtProcOverrides overrides = 2; } } -// [#not-implemented-hide:] // Overrides that may be set on a per-route basis message ExtProcOverrides { // Set a different processing mode for this route than the default. ProcessingMode processing_mode = 1; + // [#not-implemented-hide:] // Set a different asynchronous processing option than the default. bool async_mode = 2; + // [#not-implemented-hide:] // Set different optional properties than the default. repeated string request_properties = 3; + // [#not-implemented-hide:] // Set different optional properties than the default. repeated string response_properties = 4; } diff --git a/api/envoy/extensions/filters/udp/dns_filter/v3alpha/BUILD b/api/envoy/extensions/filters/udp/dns_filter/v3alpha/BUILD index 7f28290d4ee02..1f8dbc5af5610 100644 --- a/api/envoy/extensions/filters/udp/dns_filter/v3alpha/BUILD +++ b/api/envoy/extensions/filters/udp/dns_filter/v3alpha/BUILD @@ -6,6 +6,7 @@ licenses(["notice"]) # Apache 2 api_proto_package( deps = [ + "//envoy/annotations:pkg", "//envoy/config/core/v3:pkg", "//envoy/data/dns/v3:pkg", "@com_github_cncf_udpa//udpa/annotations:pkg", diff --git a/api/envoy/extensions/filters/udp/dns_filter/v3alpha/dns_filter.proto b/api/envoy/extensions/filters/udp/dns_filter/v3alpha/dns_filter.proto index 8221c11efbe78..39f44724c430f 100644 --- a/api/envoy/extensions/filters/udp/dns_filter/v3alpha/dns_filter.proto +++ b/api/envoy/extensions/filters/udp/dns_filter/v3alpha/dns_filter.proto @@ -2,12 +2,14 @@ syntax = "proto3"; package envoy.extensions.filters.udp.dns_filter.v3alpha; +import "envoy/config/core/v3/address.proto"; import "envoy/config/core/v3/base.proto"; import "envoy/config/core/v3/resolver.proto"; import "envoy/data/dns/v3/dns_table.proto"; import "google/protobuf/duration.proto"; +import "envoy/annotations/deprecation.proto"; import "udpa/annotations/status.proto"; import "validate/validate.proto"; @@ -44,6 +46,8 @@ message DnsFilterConfig { // in a client context. This message will contain the timeouts, retry, // and forwarding configuration for Envoy to make DNS requests to other // resolvers + // + // [#next-free-field: 6] message ClientContextConfig { // Sets the maximum time we will wait for the upstream query to complete // We allow 5s for the upstream resolution to complete, so the minimum @@ -51,8 +55,20 @@ message DnsFilterConfig { // number of retries multiplied by the resolver_timeout. google.protobuf.Duration resolver_timeout = 1 [(validate.rules).duration = {gte {seconds: 1}}]; + // This field was used for `dns_resolution_config` in Envoy 1.19.0 and + // 1.19.1. + // Control planes that need to set this field for Envoy 1.19.0 and + // 1.19.1 clients should fork the protobufs and change the field type + // to `DnsResolutionConfig`. + // Control planes that need to simultaneously support Envoy 1.18.x and + // Envoy 1.19.x should avoid Envoy 1.19.0 and 1.19.1. + // + // [#not-implemented-hide:] + repeated config.core.v3.Address upstream_resolvers = 2 + [deprecated = true, (envoy.annotations.deprecated_at_minor_version) = "3.0"]; + // DNS resolution configuration which includes the underlying dns resolver addresses and options. - config.core.v3.DnsResolutionConfig dns_resolution_config = 2; + config.core.v3.DnsResolutionConfig dns_resolution_config = 5; // Controls how many outstanding external lookup contexts the filter tracks. // The context structure allows the filter to respond to every query even if the external diff --git a/bazel/README.md b/bazel/README.md index cf575fa58c197..3828e675a0b37 100644 --- a/bazel/README.md +++ b/bazel/README.md @@ -7,7 +7,7 @@ It is recommended to use [Bazelisk](https://github.com/bazelbuild/bazelisk) inst On Linux, run the following commands: ```console -sudo wget -O /usr/local/bin/bazel https://github.com/bazelbuild/bazelisk/releases/latest/download/bazelisk-linux-amd64 +sudo wget -O /usr/local/bin/bazel https://github.com/bazelbuild/bazelisk/releases/latest/download/bazelisk-linux-$([ $(uname -m) = "aarch64" ] && echo "arm64" || echo "amd64") sudo chmod +x /usr/local/bin/bazel ``` diff --git a/docs/root/api-docs/xds_protocol.rst b/docs/root/api-docs/xds_protocol.rst index 77428cc932ad8..b95b6ec39ac50 100644 --- a/docs/root/api-docs/xds_protocol.rst +++ b/docs/root/api-docs/xds_protocol.rst @@ -851,13 +851,32 @@ names, which the server thought the client was already not subscribed to. The server must cleanly process such a request; it can simply ignore these phantom unsubscriptions. +In most cases (see below for exception), a server does not need to send any response if a request +does nothing except unsubscribe from a resource; in particular, servers are not generally required +to send a response with the unsubscribed resource name in the +:ref:`removed_resources ` +field. + +However, there is one exception to the above: When a client has a wildcard subscription ("*") *and* +a subscription to another specific resource name, it is possible that the specific resource name is +also included in the wildcard subscription, so if the client unsubscribes from that specific +resource name, it does not know whether or not to continue to cache the resource. To address this, +the server must send a response that includes the specific resource in either the +:ref:`removed_resources +` +field (if it is not included in the wildcard) or in the +:ref:`resources ` +field (if it *is* included in the wildcard). + Knowing When a Requested Resource Does Not Exist ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -When a resource subscribed to by a client does not exist, the server will send a :ref:`Resource -` whose :ref:`name ` field matches the -name that the client subscribed to and whose :ref:`resource ` -field is unset. This allows the client to quickly determine when a resource does not exist without +When a resource subscribed to by a client does not exist, the server +will send a +:ref:`DeltaDiscoveryResponse ` +message that contains that resource's name in the +:ref:`removed_resources ` +field. This allows the client to quickly determine when a resource does not exist without waiting for a timeout, as would be done in the SotW protocol variants. However, clients are still encouraged to use a timeout to protect against the case where the management server fails to send a response in a timely manner. diff --git a/docs/root/configuration/operations/overload_manager/overload_manager.rst b/docs/root/configuration/operations/overload_manager/overload_manager.rst index 7f034f086151a..12ae610921c09 100644 --- a/docs/root/configuration/operations/overload_manager/overload_manager.rst +++ b/docs/root/configuration/operations/overload_manager/overload_manager.rst @@ -234,12 +234,19 @@ streams based on heap usage as a trigger. When the heap usage is less than 85%, no streams will be reset. When heap usage is at or above 85%, we start to reset buckets according to the strategy described below. When the heap usage is at 95% all streams using >= 1MiB memory are eligible for reset. +This overload action will reset up to 50 streams (this is a hardcoded limit) +per worker everytime the action is invoked. This is both to reduce the amount +of streams that end up getting reset and to prevent the worker thread from +locking up and triggering the Watchdog system. Given that there are only 8 buckets, we partition the space with a gradation of :math:`gradation = (saturation_threshold - scaling_threshold)/8`. Hence at 85% heap usage we reset streams in the last bucket e.g. those using `>= 128MiB`. At :math:`85% + 1 * gradation` heap usage we reset streams in the last two buckets -e.g. those using `>= 64MiB`. And so forth as the heap usage is higher. +e.g. those using `>= 64MiB`, prioritizing the streams in the last bucket since +there's a hard limit on the number of streams we can reset per invokation. +At :math:`85% + 2 * gradation` heap usage we reset streams in the last three +buckets e.g. those using `>= 32MiB`. And so forth as the heap usage is higher. It's expected that the first few gradations shouldn't trigger anything, unless there's something seriously wrong e.g. in this example streams using `>= diff --git a/docs/root/version_history/current.rst b/docs/root/version_history/current.rst index a13ea3b633676..c8e227d34b58d 100644 --- a/docs/root/version_history/current.rst +++ b/docs/root/version_history/current.rst @@ -17,6 +17,15 @@ Incompatible Behavior Changes :ref:`contrib images `. * contrib: the :ref:`MySQL proxy filter ` has been moved to :ref:`contrib images `. +* dns_filter: :ref:`dns_filter ` + protobuf fields have been renumbered to restore compatibility with Envoy + 1.18, breaking compatibility with Envoy 1.19.0 and 1.19.1. The new field + numbering allows control planes supporting Envoy 1.18 to gracefully upgrade to + :ref:`dns_resolution_config `, + provided they skip over Envoy 1.19.0 and 1.19.1. + Control planes upgrading from Envoy 1.19.0 and 1.19.1 will need to + vendor the corresponding protobuf definitions to ensure that the + renumbered fields have the types expected by those releases. * ext_authz: fixed skipping authentication when returning either a direct response or a redirect. This behavior can be temporarily reverted by setting the ``envoy.reloadable_features.http_ext_authz_do_not_skip_direct_response_and_redirect`` runtime guard to false. Minor Behavior Changes diff --git a/envoy/common/conn_pool.h b/envoy/common/conn_pool.h index 17cf77eca6824..8fd7f1061bf95 100644 --- a/envoy/common/conn_pool.h +++ b/envoy/common/conn_pool.h @@ -36,6 +36,21 @@ class Cancellable { virtual void cancel(CancelPolicy cancel_policy) PURE; }; +/** + * Controls the behavior when draining a connection pool. + */ +enum class DrainBehavior { + // Starts draining a pool, by gracefully completing all requests and gracefully closing all + // connections, in preparation for deletion. It is invalid to create new streams or + // connections from this pool after draining a pool with this behavior. + DrainAndDelete, + // Actively drain all existing connection pool connections. This can be used in cases where + // the connection pool is not being destroyed, but the caller wishes to make sure that + // all new streams take place on a new connection. For example, when a health check failure + // occurs. + DrainExistingConnections, +}; + /** * An instance of a generic connection pool. */ @@ -59,20 +74,10 @@ class Instance { virtual bool isIdle() const PURE; /** - * Starts draining a pool, by gracefully completing all requests and gracefully closing all - * connections, in preparation for deletion. When the process completes, the function registered - * via `addIdleCallback()` is called. The callback may occur before this call returns if the pool - * can be immediately drained. - */ - virtual void startDrain() PURE; - - /** - * Actively drain all existing connection pool connections. This method can be used in cases - * where the connection pool is not being destroyed, but the caller wishes to make sure that - * all new streams take place on a new connection. For example, when a health check failure - * occurs. + * Drains the connections in a pool. + * @param drain_behavior A DrainBehavior that controls the behavior of the draining. */ - virtual void drainConnections() PURE; + virtual void drainConnections(DrainBehavior drain_behavior) PURE; /** * @return Upstream::HostDescriptionConstSharedPtr the host for which connections are pooled. diff --git a/envoy/upstream/load_balancer.h b/envoy/upstream/load_balancer.h index af85933b63d3a..109e2296ca14a 100644 --- a/envoy/upstream/load_balancer.h +++ b/envoy/upstream/load_balancer.h @@ -83,6 +83,25 @@ class LoadBalancerContext { * Returns the transport socket options which should be applied on upstream connections */ virtual Network::TransportSocketOptionsConstSharedPtr upstreamTransportSocketOptions() const PURE; + + // Using uint32_t to express expected status of override host. Every bit in the OverrideHostStatus + // represent an enum value of Host::Health. The specific correspondence is shown below: + // + // * 0b001: Host::Health::Unhealthy + // * 0b010: Host::Health::Degraded + // * 0b100: Host::Health::Healthy + // + // If multiple bit fields are set, it is acceptable as long as the status of override host is in + // any of these statuses. + using OverrideHostStatus = uint32_t; + using OverrideHost = std::pair; + + /** + * Returns the host the load balancer should select directly. If the expected host exists and + * the health status of the host matches the expectation, the load balancer can bypass the load + * balancing algorithm and return the corresponding host directly. + */ + virtual absl::optional overrideHostToSelect() const PURE; }; /** diff --git a/generated_api_shadow/envoy/extensions/filters/http/ext_proc/v3alpha/ext_proc.proto b/generated_api_shadow/envoy/extensions/filters/http/ext_proc/v3alpha/ext_proc.proto index f60865c62315e..37560feba3c27 100644 --- a/generated_api_shadow/envoy/extensions/filters/http/ext_proc/v3alpha/ext_proc.proto +++ b/generated_api_shadow/envoy/extensions/filters/http/ext_proc/v3alpha/ext_proc.proto @@ -150,7 +150,6 @@ message ExternalProcessor { string stat_prefix = 8; } -// [#not-implemented-hide:] // Extra settings that may be added to per-route configuration for a // virtual host or cluster. message ExtProcPerRoute { @@ -161,23 +160,27 @@ message ExtProcPerRoute { // If disabled is specified in multiple per-filter-configs, the most specific one will be used. bool disabled = 1 [(validate.rules).bool = {const: true}]; - // Override aspects of the configuration for this route + // Override aspects of the configuration for this route. A set of + // overrides in a more specific configuration will override a "disabled" + // flag set in a less-specific one. ExtProcOverrides overrides = 2; } } -// [#not-implemented-hide:] // Overrides that may be set on a per-route basis message ExtProcOverrides { // Set a different processing mode for this route than the default. ProcessingMode processing_mode = 1; + // [#not-implemented-hide:] // Set a different asynchronous processing option than the default. bool async_mode = 2; + // [#not-implemented-hide:] // Set different optional properties than the default. repeated string request_properties = 3; + // [#not-implemented-hide:] // Set different optional properties than the default. repeated string response_properties = 4; } diff --git a/generated_api_shadow/envoy/extensions/filters/udp/dns_filter/v3alpha/BUILD b/generated_api_shadow/envoy/extensions/filters/udp/dns_filter/v3alpha/BUILD index 7f28290d4ee02..1f8dbc5af5610 100644 --- a/generated_api_shadow/envoy/extensions/filters/udp/dns_filter/v3alpha/BUILD +++ b/generated_api_shadow/envoy/extensions/filters/udp/dns_filter/v3alpha/BUILD @@ -6,6 +6,7 @@ licenses(["notice"]) # Apache 2 api_proto_package( deps = [ + "//envoy/annotations:pkg", "//envoy/config/core/v3:pkg", "//envoy/data/dns/v3:pkg", "@com_github_cncf_udpa//udpa/annotations:pkg", diff --git a/generated_api_shadow/envoy/extensions/filters/udp/dns_filter/v3alpha/dns_filter.proto b/generated_api_shadow/envoy/extensions/filters/udp/dns_filter/v3alpha/dns_filter.proto index 8221c11efbe78..39f44724c430f 100644 --- a/generated_api_shadow/envoy/extensions/filters/udp/dns_filter/v3alpha/dns_filter.proto +++ b/generated_api_shadow/envoy/extensions/filters/udp/dns_filter/v3alpha/dns_filter.proto @@ -2,12 +2,14 @@ syntax = "proto3"; package envoy.extensions.filters.udp.dns_filter.v3alpha; +import "envoy/config/core/v3/address.proto"; import "envoy/config/core/v3/base.proto"; import "envoy/config/core/v3/resolver.proto"; import "envoy/data/dns/v3/dns_table.proto"; import "google/protobuf/duration.proto"; +import "envoy/annotations/deprecation.proto"; import "udpa/annotations/status.proto"; import "validate/validate.proto"; @@ -44,6 +46,8 @@ message DnsFilterConfig { // in a client context. This message will contain the timeouts, retry, // and forwarding configuration for Envoy to make DNS requests to other // resolvers + // + // [#next-free-field: 6] message ClientContextConfig { // Sets the maximum time we will wait for the upstream query to complete // We allow 5s for the upstream resolution to complete, so the minimum @@ -51,8 +55,20 @@ message DnsFilterConfig { // number of retries multiplied by the resolver_timeout. google.protobuf.Duration resolver_timeout = 1 [(validate.rules).duration = {gte {seconds: 1}}]; + // This field was used for `dns_resolution_config` in Envoy 1.19.0 and + // 1.19.1. + // Control planes that need to set this field for Envoy 1.19.0 and + // 1.19.1 clients should fork the protobufs and change the field type + // to `DnsResolutionConfig`. + // Control planes that need to simultaneously support Envoy 1.18.x and + // Envoy 1.19.x should avoid Envoy 1.19.0 and 1.19.1. + // + // [#not-implemented-hide:] + repeated config.core.v3.Address upstream_resolvers = 2 + [deprecated = true, (envoy.annotations.deprecated_at_minor_version) = "3.0"]; + // DNS resolution configuration which includes the underlying dns resolver addresses and options. - config.core.v3.DnsResolutionConfig dns_resolution_config = 2; + config.core.v3.DnsResolutionConfig dns_resolution_config = 5; // Controls how many outstanding external lookup contexts the filter tracks. // The context structure allows the filter to respond to every query even if the external diff --git a/source/common/buffer/watermark_buffer.cc b/source/common/buffer/watermark_buffer.cc index 734fed3fb0443..10ab4500f77bf 100644 --- a/source/common/buffer/watermark_buffer.cc +++ b/source/common/buffer/watermark_buffer.cc @@ -16,6 +16,9 @@ namespace { // Effectively disables tracking as this should zero out all reasonable account // balances when shifted by this amount. constexpr uint32_t kEffectivelyDisableTrackingBitshift = 63; +// 50 is an arbitrary limit, and is meant to both limit the number of streams +// Envoy ends up resetting and avoid triggering the Watchdog system. +constexpr uint32_t kMaxNumberOfStreamsToResetPerInvocation = 50; } // end namespace void WatermarkBuffer::add(const void* data, uint64_t size) { @@ -194,19 +197,23 @@ uint64_t WatermarkBufferFactory::resetAccountsGivenPressure(float pressure) { // Compute buckets to clear const uint32_t buckets_to_clear = std::min( std::floor(pressure * BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_) + 1, 8); - uint32_t bucket_idx = BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_ - buckets_to_clear; - - ENVOY_LOG_MISC(warn, "resetting streams in buckets >= {}", bucket_idx); - uint64_t num_streams_reset = 0; - // TODO(kbaichoo): Add a limit to the number of streams we reset - // per-invocation of this function. - // Clear buckets - while (bucket_idx < BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_) { + + uint32_t last_bucket_to_clear = BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_ - buckets_to_clear; + ENVOY_LOG_MISC(warn, "resetting streams in buckets >= {}", last_bucket_to_clear); + + // Clear buckets, prioritizing the buckets with larger streams. + uint32_t num_streams_reset = 0; + for (uint32_t buckets_cleared = 0; buckets_cleared < buckets_to_clear; ++buckets_cleared) { + const uint32_t bucket_to_clear = + BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_ - buckets_cleared - 1; ENVOY_LOG_MISC(warn, "resetting {} streams in bucket {}.", - size_class_account_sets_[bucket_idx].size(), bucket_idx); + size_class_account_sets_[bucket_to_clear].size(), bucket_to_clear); - auto it = size_class_account_sets_[bucket_idx].begin(); - while (it != size_class_account_sets_[bucket_idx].end()) { + auto it = size_class_account_sets_[bucket_to_clear].begin(); + while (it != size_class_account_sets_[bucket_to_clear].end()) { + if (num_streams_reset >= kMaxNumberOfStreamsToResetPerInvocation) { + return num_streams_reset; + } auto next = std::next(it); // This will trigger an erase, which avoids rehashing and invalidates the // iterator *it*. *next* is still valid. @@ -214,8 +221,6 @@ uint64_t WatermarkBufferFactory::resetAccountsGivenPressure(float pressure) { it = next; ++num_streams_reset; } - - ++bucket_idx; } return num_streams_reset; diff --git a/source/common/conn_pool/conn_pool_base.cc b/source/common/conn_pool/conn_pool_base.cc index e91cc10709790..113836ab13648 100644 --- a/source/common/conn_pool/conn_pool_base.cc +++ b/source/common/conn_pool/conn_pool_base.cc @@ -229,6 +229,7 @@ void ConnPoolImplBase::onStreamClosed(Envoy::ConnectionPool::ActiveClient& clien } ConnectionPool::Cancellable* ConnPoolImplBase::newStreamImpl(AttachContext& context) { + ASSERT(!is_draining_for_deletion_); ASSERT(!deferred_deleting_); ASSERT(static_cast(connecting_stream_capacity_) == @@ -331,11 +332,6 @@ void ConnPoolImplBase::transitionActiveClientState(ActiveClient& client, void ConnPoolImplBase::addIdleCallbackImpl(Instance::IdleCb cb) { idle_callbacks_.push_back(cb); } -void ConnPoolImplBase::startDrainImpl() { - is_draining_ = true; - checkForIdleAndCloseIdleConnsIfDraining(); -} - void ConnPoolImplBase::closeIdleConnectionsForDrainingPool() { // Create a separate list of elements to close to avoid mutate-while-iterating problems. std::list to_close; @@ -359,7 +355,12 @@ void ConnPoolImplBase::closeIdleConnectionsForDrainingPool() { } } -void ConnPoolImplBase::drainConnectionsImpl() { +void ConnPoolImplBase::drainConnectionsImpl(DrainBehavior drain_behavior) { + if (drain_behavior == Envoy::ConnectionPool::DrainBehavior::DrainAndDelete) { + is_draining_for_deletion_ = true; + checkForIdleAndCloseIdleConnsIfDraining(); + return; + } closeIdleConnectionsForDrainingPool(); // closeIdleConnections() closes all connections in ready_clients_ with no active streams, @@ -387,12 +388,13 @@ bool ConnPoolImplBase::isIdleImpl() const { } void ConnPoolImplBase::checkForIdleAndCloseIdleConnsIfDraining() { - if (is_draining_) { + if (is_draining_for_deletion_) { closeIdleConnectionsForDrainingPool(); } if (isIdleImpl()) { - ENVOY_LOG(debug, "invoking idle callbacks - is_draining_={}", is_draining_); + ENVOY_LOG(debug, "invoking idle callbacks - is_draining_for_deletion_={}", + is_draining_for_deletion_); for (const Instance::IdleCb& cb : idle_callbacks_) { cb(); } diff --git a/source/common/conn_pool/conn_pool_base.h b/source/common/conn_pool/conn_pool_base.h index 5ce4d9936feef..dafc845f88915 100644 --- a/source/common/conn_pool/conn_pool_base.h +++ b/source/common/conn_pool/conn_pool_base.h @@ -166,8 +166,7 @@ class ConnPoolImplBase : protected Logger::Loggable { void addIdleCallbackImpl(Instance::IdleCb cb); // Returns true if the pool is idle. bool isIdleImpl() const; - void startDrainImpl(); - void drainConnectionsImpl(); + void drainConnectionsImpl(DrainBehavior drain_behavior); const Upstream::HostConstSharedPtr& host() const { return host_; } // Called if this pool is likely to be picked soon, to determine if it's worth preconnecting. bool maybePreconnectImpl(float global_preconnect_ratio); @@ -335,7 +334,7 @@ class ConnPoolImplBase : protected Logger::Loggable { // Whether the connection pool is currently in the process of closing // all connections so that it can be gracefully deleted. - bool is_draining_{false}; + bool is_draining_for_deletion_{false}; // True iff this object is in the deferred delete list. bool deferred_deleting_{false}; diff --git a/source/common/http/conn_pool_base.h b/source/common/http/conn_pool_base.h index e14a6ffa2ae3a..47bdfdad7f3f2 100644 --- a/source/common/http/conn_pool_base.h +++ b/source/common/http/conn_pool_base.h @@ -61,8 +61,9 @@ class HttpConnPoolImplBase : public Envoy::ConnectionPool::ConnPoolImplBase, // ConnectionPool::Instance void addIdleCallback(IdleCb cb) override { addIdleCallbackImpl(cb); } bool isIdle() const override { return isIdleImpl(); } - void startDrain() override { startDrainImpl(); } - void drainConnections() override { drainConnectionsImpl(); } + void drainConnections(Envoy::ConnectionPool::DrainBehavior drain_behavior) override { + drainConnectionsImpl(drain_behavior); + } Upstream::HostDescriptionConstSharedPtr host() const override { return host_; } ConnectionPool::Cancellable* newStream(Http::ResponseDecoder& response_decoder, Http::ConnectionPool::Callbacks& callbacks) override; diff --git a/source/common/http/conn_pool_grid.cc b/source/common/http/conn_pool_grid.cc index f5e38a1a9e2ef..bb70fad9978c9 100644 --- a/source/common/http/conn_pool_grid.cc +++ b/source/common/http/conn_pool_grid.cc @@ -296,24 +296,20 @@ void ConnectivityGrid::addIdleCallback(IdleCb cb) { idle_callbacks_.emplace_back(cb); } -void ConnectivityGrid::startDrain() { +void ConnectivityGrid::drainConnections(Envoy::ConnectionPool::DrainBehavior drain_behavior) { if (draining_) { // A drain callback has already been set, and only needs to happen once. return; } - // Note that no new pools can be created from this point on - // as createNextPool fast-fails if `draining_` is true. - draining_ = true; - - for (auto& pool : pools_) { - pool->startDrain(); + if (drain_behavior == Envoy::ConnectionPool::DrainBehavior::DrainAndDelete) { + // Note that no new pools can be created from this point on + // as createNextPool fast-fails if `draining_` is true. + draining_ = true; } -} -void ConnectivityGrid::drainConnections() { for (auto& pool : pools_) { - pool->drainConnections(); + pool->drainConnections(drain_behavior); } } diff --git a/source/common/http/conn_pool_grid.h b/source/common/http/conn_pool_grid.h index c9b01a74e1622..2c4832067f6ad 100644 --- a/source/common/http/conn_pool_grid.h +++ b/source/common/http/conn_pool_grid.h @@ -148,8 +148,7 @@ class ConnectivityGrid : public ConnectionPool::Instance, ConnectionPool::Callbacks& callbacks) override; void addIdleCallback(IdleCb cb) override; bool isIdle() const override; - void startDrain() override; - void drainConnections() override; + void drainConnections(Envoy::ConnectionPool::DrainBehavior drain_behavior) override; Upstream::HostDescriptionConstSharedPtr host() const override; bool maybePreconnect(float preconnect_ratio) override; absl::string_view protocolDescription() const override { return "connection grid"; } diff --git a/source/common/tcp/conn_pool.h b/source/common/tcp/conn_pool.h index 949feeb7ee7a2..d3a4a6d81d37e 100644 --- a/source/common/tcp/conn_pool.h +++ b/source/common/tcp/conn_pool.h @@ -150,9 +150,11 @@ class ConnPoolImpl : public Envoy::ConnectionPool::ConnPoolImplBase, void addIdleCallback(IdleCb cb) override { addIdleCallbackImpl(cb); } bool isIdle() const override { return isIdleImpl(); } - void startDrain() override { startDrainImpl(); } - void drainConnections() override { - drainConnectionsImpl(); + void drainConnections(Envoy::ConnectionPool::DrainBehavior drain_behavior) override { + drainConnectionsImpl(drain_behavior); + if (drain_behavior == Envoy::ConnectionPool::DrainBehavior::DrainAndDelete) { + return; + } // Legacy behavior for the TCP connection pool marks all connecting clients // as draining. for (auto& connecting_client : connecting_clients_) { diff --git a/source/common/tcp/original_conn_pool.cc b/source/common/tcp/original_conn_pool.cc index 325c424aa61c6..7321d3723a5c6 100644 --- a/source/common/tcp/original_conn_pool.cc +++ b/source/common/tcp/original_conn_pool.cc @@ -37,7 +37,13 @@ OriginalConnPoolImpl::~OriginalConnPoolImpl() { dispatcher_.clearDeferredDeleteList(); } -void OriginalConnPoolImpl::drainConnections() { +void OriginalConnPoolImpl::drainConnections(Envoy::ConnectionPool::DrainBehavior drain_behavior) { + if (drain_behavior == Envoy::ConnectionPool::DrainBehavior::DrainAndDelete) { + is_draining_ = true; + checkForIdleAndCloseIdleConnsIfDraining(); + return; + } + ENVOY_LOG(debug, "draining connections"); while (!ready_conns_.empty()) { ready_conns_.front()->conn_->close(Network::ConnectionCloseType::NoFlush); @@ -70,11 +76,6 @@ void OriginalConnPoolImpl::closeConnections() { void OriginalConnPoolImpl::addIdleCallback(IdleCb cb) { idle_callbacks_.push_back(cb); } -void OriginalConnPoolImpl::startDrain() { - is_draining_ = true; - checkForIdleAndCloseIdleConnsIfDraining(); -} - void OriginalConnPoolImpl::assignConnection(ActiveConn& conn, ConnectionPool::Callbacks& callbacks) { ASSERT(conn.wrapper_ == nullptr); diff --git a/source/common/tcp/original_conn_pool.h b/source/common/tcp/original_conn_pool.h index d5e79580c5b46..8e4d07a1f9e2c 100644 --- a/source/common/tcp/original_conn_pool.h +++ b/source/common/tcp/original_conn_pool.h @@ -34,8 +34,7 @@ class OriginalConnPoolImpl : Logger::Loggable, public Connecti // ConnectionPool::Instance void addIdleCallback(IdleCb cb) override; bool isIdle() const override; - void startDrain() override; - void drainConnections() override; + void drainConnections(Envoy::ConnectionPool::DrainBehavior drain_behavior) override; void closeConnections() override; ConnectionPool::Cancellable* newConnection(ConnectionPool::Callbacks& callbacks) override; // The old pool does not implement preconnecting. diff --git a/source/common/upstream/cluster_manager_impl.cc b/source/common/upstream/cluster_manager_impl.cc index f2c9ff1271b99..cbe3507907f5d 100644 --- a/source/common/upstream/cluster_manager_impl.cc +++ b/source/common/upstream/cluster_manager_impl.cc @@ -1197,7 +1197,7 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::drainConnPools( // guarding deletion with `do_not_delete_` in the registered idle callback, and then checking // afterwards whether it is empty and deleting it if necessary. container.do_not_delete_ = true; - pools->startDrain(); + pools->drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainAndDelete); container.do_not_delete_ = false; if (container.pools_->size() == 0) { @@ -1217,7 +1217,7 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::drainTcpConnPools( container.draining_ = true; for (auto pool : pools) { - pool->startDrain(); + pool->drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainAndDelete); } } @@ -1389,7 +1389,8 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::drainAllConnPoolsWorker( const auto container = getHttpConnPoolsContainer(host); if (container != nullptr) { container->do_not_delete_ = true; - container->pools_->drainConnections(); + container->pools_->drainConnections( + Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections); container->do_not_delete_ = false; if (container->pools_->size() == 0) { @@ -1417,7 +1418,7 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::drainAllConnPoolsWorker( ClusterInfo::Features::CLOSE_CONNECTIONS_ON_HOST_HEALTH_FAILURE) { pool->closeConnections(); } else { - pool->drainConnections(); + pool->drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections); } } } diff --git a/source/common/upstream/conn_pool_map.h b/source/common/upstream/conn_pool_map.h index 4d6a557d7e680..b3840c3600cd6 100644 --- a/source/common/upstream/conn_pool_map.h +++ b/source/common/upstream/conn_pool_map.h @@ -3,6 +3,7 @@ #include #include +#include "envoy/common/conn_pool.h" #include "envoy/event/dispatcher.h" #include "envoy/upstream/resource_manager.h" #include "envoy/upstream/upstream.h" @@ -58,15 +59,10 @@ template class ConnPoolMap { */ void addIdleCallback(const IdleCb& cb); - /** - * See `Envoy::ConnectionPool::Instance::startDrain()`. - */ - void startDrain(); - /** * See `Envoy::ConnectionPool::Instance::drainConnections()`. */ - void drainConnections(); + void drainConnections(Envoy::ConnectionPool::DrainBehavior drain_behavior); private: /** diff --git a/source/common/upstream/conn_pool_map_impl.h b/source/common/upstream/conn_pool_map_impl.h index 18176d0a18ec8..63db84a047412 100644 --- a/source/common/upstream/conn_pool_map_impl.h +++ b/source/common/upstream/conn_pool_map_impl.h @@ -99,7 +99,8 @@ void ConnPoolMap::addIdleCallback(const IdleCb& cb) { } template -void ConnPoolMap::startDrain() { +void ConnPoolMap::drainConnections( + Envoy::ConnectionPool::DrainBehavior drain_behavior) { // Copy the `active_pools_` so that it is safe for the call to result // in deletion, and avoid iteration through a mutating container. std::vector pools; @@ -109,22 +110,7 @@ void ConnPoolMap::startDrain() { } for (auto* pool : pools) { - pool->startDrain(); - } -} - -template -void ConnPoolMap::drainConnections() { - // Copy the `active_pools_` so that it is safe for the call to result - // in deletion, and avoid iteration through a mutating container. - std::vector pools; - pools.reserve(active_pools_.size()); - for (auto& pool_pair : active_pools_) { - pools.push_back(pool_pair.second.get()); - } - - for (auto* pool : pools) { - pool->drainConnections(); + pool->drainConnections(drain_behavior); } } diff --git a/source/common/upstream/load_balancer_impl.cc b/source/common/upstream/load_balancer_impl.cc index 47db0423169e5..52619e57f0391 100644 --- a/source/common/upstream/load_balancer_impl.cc +++ b/source/common/upstream/load_balancer_impl.cc @@ -1,5 +1,6 @@ #include "source/common/upstream/load_balancer_impl.h" +#include #include #include #include @@ -22,6 +23,10 @@ static const std::string RuntimeZoneEnabled = "upstream.zone_routing.enabled"; static const std::string RuntimeMinClusterSize = "upstream.zone_routing.min_cluster_size"; static const std::string RuntimePanicThreshold = "upstream.healthy_panic_threshold"; +static constexpr uint32_t UnhealthyStatus = 1u << static_cast(Host::Health::Unhealthy); +static constexpr uint32_t DegradedStatus = 1u << static_cast(Host::Health::Degraded); +static constexpr uint32_t HealthyStatus = 1u << static_cast(Host::Health::Healthy); + bool tooManyPreconnects(size_t num_preconnect_picks, uint32_t healthy_hosts) { // Currently we only allow the number of preconnected connections to equal the // number of healthy hosts. @@ -361,6 +366,9 @@ ZoneAwareLoadBalancerBase::ZoneAwareLoadBalancerBase( resizePerPriorityState(); priority_update_cb_ = priority_set_.addPriorityUpdateCb( [this](uint32_t priority, const HostVector&, const HostVector&) -> void { + // Update cross priority host map for fast host searching. + cross_priority_host_map_ = priority_set_.crossPriorityHostMap(); + // Make sure per_priority_state_ is as large as priority_set_.hostSetsPerPriority() resizePerPriorityState(); // If P=0 changes, regenerate locality routing structures. Locality based routing is @@ -507,8 +515,59 @@ bool ZoneAwareLoadBalancerBase::earlyExitNonLocalityRouting() { return false; } -HostConstSharedPtr LoadBalancerBase::chooseHost(LoadBalancerContext* context) { - HostConstSharedPtr host; +bool LoadBalancerContextBase::validateOverrideHostStatus(Host::Health health, + OverrideHostStatus status) { + switch (health) { + case Host::Health::Unhealthy: + return status & UnhealthyStatus; + case Host::Health::Degraded: + return status & DegradedStatus; + case Host::Health::Healthy: + return status & HealthyStatus; + } + return false; +} + +HostConstSharedPtr LoadBalancerContextBase::selectOverrideHost(const HostMap* host_map, + LoadBalancerContext* context) { + if (context == nullptr) { + return nullptr; + } + + auto override_host = context->overrideHostToSelect(); + if (!override_host.has_value()) { + return nullptr; + } + + if (host_map == nullptr) { + return nullptr; + } + + auto host_iter = host_map->find(override_host.value().first); + + // The override host cannot be found in the host map. + if (host_iter == host_map->end()) { + return nullptr; + } + + HostConstSharedPtr host = host_iter->second; + ASSERT(host != nullptr); + + // Verify the host status. + if (LoadBalancerContextBase::validateOverrideHostStatus(host->health(), + override_host.value().second)) { + return host; + } + return nullptr; +} + +HostConstSharedPtr ZoneAwareLoadBalancerBase::chooseHost(LoadBalancerContext* context) { + HostConstSharedPtr host = + LoadBalancerContextBase::selectOverrideHost(cross_priority_host_map_.get(), context); + if (host != nullptr) { + return host; + } + const size_t max_attempts = context ? context->hostSelectionRetryCount() + 1 : 1; for (size_t i = 0; i < max_attempts; ++i) { host = chooseHostOnce(context); diff --git a/source/common/upstream/load_balancer_impl.h b/source/common/upstream/load_balancer_impl.h index 219b10f21e131..f38e3f5765167 100644 --- a/source/common/upstream/load_balancer_impl.h +++ b/source/common/upstream/load_balancer_impl.h @@ -40,24 +40,7 @@ class LoadBalancerBase : public LoadBalancer { choosePriority(uint64_t hash, const HealthyLoad& healthy_per_priority_load, const DegradedLoad& degraded_per_priority_load); - HostConstSharedPtr chooseHost(LoadBalancerContext* context) override; - protected: - /** - * By implementing this method instead of chooseHost, host selection will - * be subject to host filters specified by LoadBalancerContext. - * - * Host selection will be retried up to the number specified by - * hostSelectionRetryCount on LoadBalancerContext, and if no hosts are found - * within the allowed attempts, the host that was selected during the last - * attempt will be returned. - * - * If host selection is idempotent (i.e. retrying will not change the outcome), - * sub classes should override chooseHost to avoid the unnecessary overhead of - * retrying host selection. - */ - virtual HostConstSharedPtr chooseHostOnce(LoadBalancerContext* context) PURE; - /** * For the given host_set @return if we should be in a panic mode or not. For example, if the * majority of hosts are unhealthy we'll be likely in a panic mode. In this case we'll route @@ -147,6 +130,7 @@ class LoadBalancerBase : public LoadBalancer { return std::min(health + degraded, 100); } + // The percentage load (0-100) for each priority level when targeting healthy hosts and // the percentage load (0-100) for each priority level when targeting degraded hosts. HealthyAndDegradedLoad per_priority_load_; @@ -165,6 +149,11 @@ class LoadBalancerBase : public LoadBalancer { class LoadBalancerContextBase : public LoadBalancerContext { public: + static bool validateOverrideHostStatus(Host::Health health, OverrideHostStatus status); + + static HostConstSharedPtr selectOverrideHost(const HostMap* host_map, + LoadBalancerContext* context); + absl::optional computeHashKey() override { return {}; } const Network::Connection* downstreamConnection() const override { return nullptr; } @@ -188,12 +177,17 @@ class LoadBalancerContextBase : public LoadBalancerContext { Network::TransportSocketOptionsConstSharedPtr upstreamTransportSocketOptions() const override { return nullptr; } + + absl::optional overrideHostToSelect() const override { return {}; } }; /** * Base class for zone aware load balancers */ class ZoneAwareLoadBalancerBase : public LoadBalancerBase { +public: + HostConstSharedPtr chooseHost(LoadBalancerContext* context) override; + protected: // Both priority_set and local_priority_set if non-null must have at least one host set. ZoneAwareLoadBalancerBase( @@ -258,6 +252,21 @@ class ZoneAwareLoadBalancerBase : public LoadBalancerBase { } }; + /** + * By implementing this method instead of chooseHost, host selection will + * be subject to host filters specified by LoadBalancerContext. + * + * Host selection will be retried up to the number specified by + * hostSelectionRetryCount on LoadBalancerContext, and if no hosts are found + * within the allowed attempts, the host that was selected during the last + * attempt will be returned. + * + * If host selection is idempotent (i.e. retrying will not change the outcome), + * sub classes should override chooseHost to avoid the unnecessary overhead of + * retrying host selection. + */ + virtual HostConstSharedPtr chooseHostOnce(LoadBalancerContext* context) PURE; + /** * Pick the host source to use, doing zone aware routing when the hosts are sufficiently healthy. * If no host is chosen (due to fail_traffic_on_panic being set), return absl::nullopt. @@ -269,6 +278,10 @@ class ZoneAwareLoadBalancerBase : public LoadBalancerBase { */ const HostVector& hostSourceToHosts(HostsSource hosts_source) const; + // Cross priority host map for fast cross priority host searching. When the priority update + // callback is executed, the host map will also be updated. + HostMapConstSharedPtr cross_priority_host_map_; + private: enum class LocalityRoutingState { // Locality based routing is off. @@ -381,7 +394,7 @@ class EdfLoadBalancerBase : public ZoneAwareLoadBalancerBase { Random::RandomGenerator& random, const envoy::config::cluster::v3::Cluster::CommonLbConfig& common_config); - // Upstream::LoadBalancerBase + // Upstream::ZoneAwareLoadBalancerBase HostConstSharedPtr peekAnotherHost(LoadBalancerContext* context) override; HostConstSharedPtr chooseHostOnce(LoadBalancerContext* context) override; @@ -579,7 +592,7 @@ class RandomLoadBalancer : public ZoneAwareLoadBalancerBase { : ZoneAwareLoadBalancerBase(priority_set, local_priority_set, stats, runtime, random, common_config) {} - // Upstream::LoadBalancerBase + // Upstream::ZoneAwareLoadBalancerBase HostConstSharedPtr chooseHostOnce(LoadBalancerContext* context) override; HostConstSharedPtr peekAnotherHost(LoadBalancerContext* context) override; diff --git a/source/common/upstream/priority_conn_pool_map.h b/source/common/upstream/priority_conn_pool_map.h index c43ba46c06ea5..d3c3c66bd4714 100644 --- a/source/common/upstream/priority_conn_pool_map.h +++ b/source/common/upstream/priority_conn_pool_map.h @@ -51,15 +51,10 @@ template class PriorityConnPoolMap { */ void addIdleCallback(const IdleCb& cb); - /** - * See `Envoy::ConnectionPool::Instance::startDrain()`. - */ - void startDrain(); - /** * See `Envoy::ConnectionPool::Instance::drainConnections()`. */ - void drainConnections(); + void drainConnections(Envoy::ConnectionPool::DrainBehavior drain_behavior); private: size_t getPriorityIndex(ResourcePriority priority) const; diff --git a/source/common/upstream/priority_conn_pool_map_impl.h b/source/common/upstream/priority_conn_pool_map_impl.h index 66cc9ff4407ea..a706938b8e182 100644 --- a/source/common/upstream/priority_conn_pool_map_impl.h +++ b/source/common/upstream/priority_conn_pool_map_impl.h @@ -55,16 +55,10 @@ void PriorityConnPoolMap::addIdleCallback(const IdleCb& cb) } template -void PriorityConnPoolMap::startDrain() { +void PriorityConnPoolMap::drainConnections( + ConnectionPool::DrainBehavior drain_behavior) { for (auto& pool_map : conn_pool_maps_) { - pool_map->startDrain(); - } -} - -template -void PriorityConnPoolMap::drainConnections() { - for (auto& pool_map : conn_pool_maps_) { - pool_map->drainConnections(); + pool_map->drainConnections(drain_behavior); } } diff --git a/source/common/upstream/subset_lb.cc b/source/common/upstream/subset_lb.cc index 2a080c32ee60a..4c5a420a94501 100644 --- a/source/common/upstream/subset_lb.cc +++ b/source/common/upstream/subset_lb.cc @@ -82,6 +82,9 @@ SubsetLoadBalancer::SubsetLoadBalancer( // performed. rebuildSingle(); + // Update cross priority host map. + cross_priority_host_map_ = original_priority_set_.crossPriorityHostMap(); + if (hosts_added.empty() && hosts_removed.empty()) { // It's possible that metadata changed, without hosts being added nor removed. // If so we need to add any new subsets, remove unused ones, and regroup hosts into @@ -276,6 +279,12 @@ void SubsetLoadBalancer::initSelectorFallbackSubset( } HostConstSharedPtr SubsetLoadBalancer::chooseHost(LoadBalancerContext* context) { + HostConstSharedPtr override_host = + LoadBalancerContextBase::selectOverrideHost(cross_priority_host_map_.get(), context); + if (override_host != nullptr) { + return override_host; + } + if (context) { bool host_chosen; HostConstSharedPtr host = tryChooseHostFromContext(context, host_chosen); diff --git a/source/common/upstream/subset_lb.h b/source/common/upstream/subset_lb.h index d64f2f54839e4..354341ff060cf 100644 --- a/source/common/upstream/subset_lb.h +++ b/source/common/upstream/subset_lb.h @@ -162,6 +162,10 @@ class SubsetLoadBalancer : public LoadBalancer, Logger::LoggableupstreamTransportSocketOptions(); } + absl::optional overrideHostToSelect() const override { + return wrapped_->overrideHostToSelect(); + } + private: LoadBalancerContext* wrapped_; Router::MetadataMatchCriteriaConstPtr metadata_match_; @@ -268,6 +272,10 @@ class SubsetLoadBalancer : public LoadBalancer, Logger::Loggable single_host_per_subset_map_; Stats::Gauge* single_duplicate_stat_{}; + // Cross priority host map for fast cross priority host searching. When the priority update + // callback is executed, the host map will also be updated. + HostMapConstSharedPtr cross_priority_host_map_; + const bool locality_weight_aware_; const bool scale_locality_weight_; const bool list_as_any_; diff --git a/source/common/upstream/thread_aware_lb_impl.cc b/source/common/upstream/thread_aware_lb_impl.cc index 888f63a3789f3..8da946d77d980 100644 --- a/source/common/upstream/thread_aware_lb_impl.cc +++ b/source/common/upstream/thread_aware_lb_impl.cc @@ -95,7 +95,10 @@ void ThreadAwareLoadBalancerBase::initialize() { // complicated initialization as the load balancer would need its own initialized callback. I // think the synchronous/asynchronous split is probably the best option. priority_update_cb_ = priority_set_.addPriorityUpdateCb( - [this](uint32_t, const HostVector&, const HostVector&) -> void { refresh(); }); + [this](uint32_t, const HostVector&, const HostVector&) -> void { + refresh(); + threadSafeSetCrossPriorityHostMap(priority_set_.crossPriorityHostMap()); + }); refresh(); } @@ -141,6 +144,12 @@ ThreadAwareLoadBalancerBase::LoadBalancerImpl::chooseHost(LoadBalancerContext* c return nullptr; } + HostConstSharedPtr host = + LoadBalancerContextBase::selectOverrideHost(cross_priority_host_map_.get(), context); + if (host != nullptr) { + return host; + } + // If there is no hash in the context, just choose a random value (this effectively becomes // the random LB but it won't crash if someone configures it this way). // computeHashKey() may be computed on demand, so get it only once. @@ -158,7 +167,6 @@ ThreadAwareLoadBalancerBase::LoadBalancerImpl::chooseHost(LoadBalancerContext* c stats_.lb_healthy_panic_.inc(); } - HostConstSharedPtr host; const uint32_t max_attempts = context ? context->hostSelectionRetryCount() + 1 : 1; for (uint32_t i = 0; i < max_attempts; ++i) { host = per_priority_state->current_lb_->chooseHost(h, i); @@ -173,7 +181,8 @@ ThreadAwareLoadBalancerBase::LoadBalancerImpl::chooseHost(LoadBalancerContext* c } LoadBalancerPtr ThreadAwareLoadBalancerBase::LoadBalancerFactoryImpl::create() { - auto lb = std::make_unique(stats_, random_); + auto lb = std::make_unique( + stats_, random_, thread_aware_lb_.threadSafeGetCrossPriorityHostMap()); // We must protect current_lb_ via a RW lock since it is accessed and written to by multiple // threads. All complex processing has already been precalculated however. diff --git a/source/common/upstream/thread_aware_lb_impl.h b/source/common/upstream/thread_aware_lb_impl.h index 38c04e99b5019..81a1e5e2e4c83 100644 --- a/source/common/upstream/thread_aware_lb_impl.h +++ b/source/common/upstream/thread_aware_lb_impl.h @@ -87,10 +87,8 @@ class ThreadAwareLoadBalancerBase : public LoadBalancerBase, public ThreadAwareL LoadBalancerFactorySharedPtr factory() override { return factory_; } void initialize() override; - // Upstream::LoadBalancerBase - HostConstSharedPtr chooseHostOnce(LoadBalancerContext*) override { - NOT_IMPLEMENTED_GCOVR_EXCL_LINE; - } + // Upstream::LoadBalancer + HostConstSharedPtr chooseHost(LoadBalancerContext*) override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; } // Preconnect not implemented for hash based load balancing HostConstSharedPtr peekAnotherHost(LoadBalancerContext*) override { return nullptr; } @@ -100,7 +98,7 @@ class ThreadAwareLoadBalancerBase : public LoadBalancerBase, public ThreadAwareL Random::RandomGenerator& random, const envoy::config::cluster::v3::Cluster::CommonLbConfig& common_config) : LoadBalancerBase(priority_set, stats, runtime, random, common_config), - factory_(new LoadBalancerFactoryImpl(stats, random)) {} + factory_(new LoadBalancerFactoryImpl(stats, random, *this)) {} private: struct PerPriorityState { @@ -110,8 +108,9 @@ class ThreadAwareLoadBalancerBase : public LoadBalancerBase, public ThreadAwareL using PerPriorityStatePtr = std::unique_ptr; struct LoadBalancerImpl : public LoadBalancer { - LoadBalancerImpl(ClusterStats& stats, Random::RandomGenerator& random) - : stats_(stats), random_(random) {} + LoadBalancerImpl(ClusterStats& stats, Random::RandomGenerator& random, + HostMapConstSharedPtr host_map) + : stats_(stats), random_(random), cross_priority_host_map_(std::move(host_map)) {} // Upstream::LoadBalancer HostConstSharedPtr chooseHost(LoadBalancerContext* context) override; @@ -123,15 +122,21 @@ class ThreadAwareLoadBalancerBase : public LoadBalancerBase, public ThreadAwareL std::shared_ptr> per_priority_state_; std::shared_ptr healthy_per_priority_load_; std::shared_ptr degraded_per_priority_load_; + + // Cross priority host map. + HostMapConstSharedPtr cross_priority_host_map_; }; struct LoadBalancerFactoryImpl : public LoadBalancerFactory { - LoadBalancerFactoryImpl(ClusterStats& stats, Random::RandomGenerator& random) - : stats_(stats), random_(random) {} + LoadBalancerFactoryImpl(ClusterStats& stats, Random::RandomGenerator& random, + ThreadAwareLoadBalancerBase& thread_aware_lb) + : thread_aware_lb_(thread_aware_lb), stats_(stats), random_(random) {} // Upstream::LoadBalancerFactory LoadBalancerPtr create() override; + ThreadAwareLoadBalancerBase& thread_aware_lb_; + ClusterStats& stats_; Random::RandomGenerator& random_; absl::Mutex mutex_; @@ -146,8 +151,29 @@ class ThreadAwareLoadBalancerBase : public LoadBalancerBase, public ThreadAwareL double min_normalized_weight, double max_normalized_weight) PURE; void refresh(); + void threadSafeSetCrossPriorityHostMap(HostMapConstSharedPtr host_map) { + absl::MutexLock ml(&cross_priority_host_map_mutex_); + cross_priority_host_map_ = std::move(host_map); + } + HostMapConstSharedPtr threadSafeGetCrossPriorityHostMap() { + absl::MutexLock ml(&cross_priority_host_map_mutex_); + return cross_priority_host_map_; + } + std::shared_ptr factory_; Common::CallbackHandlePtr priority_update_cb_; + + // Whenever the membership changes, the cross_priority_host_map_ will be updated automatically. + // And all workers will create a new worker local load balancer and copy the + // cross_priority_host_map_. + // + // This leads to the possibility of simultaneous reading and writing of cross_priority_host_map_ + // in different threads. For this reason, an additional mutex is necessary to guard + // cross_priority_host_map_. + absl::Mutex cross_priority_host_map_mutex_; + // Cross priority host map for fast cross priority host searching. When the priority update + // callback is executed, the host map will also be updated. + HostMapConstSharedPtr cross_priority_host_map_ ABSL_GUARDED_BY(cross_priority_host_map_mutex_); }; } // namespace Upstream diff --git a/source/common/upstream/upstream_impl.cc b/source/common/upstream/upstream_impl.cc index 5897aab3e2f1e..2eec93be78914 100644 --- a/source/common/upstream/upstream_impl.cc +++ b/source/common/upstream/upstream_impl.cc @@ -551,16 +551,18 @@ void PrioritySetImpl::updateHosts(uint32_t priority, UpdateHostsParams&& update_ const HostVector& hosts_added, const HostVector& hosts_removed, absl::optional overprovisioning_factor, HostMapConstSharedPtr cross_priority_host_map) { + // Update cross priority host map first. In this way, when the update callbacks of the priority + // set are executed, the latest host map can always be obtained. + if (cross_priority_host_map != nullptr) { + const_cross_priority_host_map_ = std::move(cross_priority_host_map); + } + // Ensure that we have a HostSet for the given priority. getOrCreateHostSet(priority, overprovisioning_factor); static_cast(host_sets_[priority].get()) ->updateHosts(std::move(update_hosts_params), std::move(locality_weights), hosts_added, hosts_removed, overprovisioning_factor); - if (cross_priority_host_map != nullptr) { - const_cross_priority_host_map_ = std::move(cross_priority_host_map); - } - if (!batch_update_) { runUpdateCallbacks(hosts_added, hosts_removed); } diff --git a/source/extensions/clusters/aggregate/cluster.h b/source/extensions/clusters/aggregate/cluster.h index 031a3fe2b63d3..5c23d632d3e8c 100644 --- a/source/extensions/clusters/aggregate/cluster.h +++ b/source/extensions/clusters/aggregate/cluster.h @@ -102,11 +102,6 @@ class AggregateClusterLoadBalancer : public Upstream::LoadBalancer, return nullptr; } - // Upstream::LoadBalancerBase - Upstream::HostConstSharedPtr chooseHostOnce(Upstream::LoadBalancerContext*) override { - NOT_IMPLEMENTED_GCOVR_EXCL_LINE; - } - absl::optional hostToLinearizedPriority(const Upstream::HostDescription& host) const; private: diff --git a/source/extensions/clusters/aggregate/lb_context.h b/source/extensions/clusters/aggregate/lb_context.h index 65eb3a5b0db6b..e388cde7dbeb9 100644 --- a/source/extensions/clusters/aggregate/lb_context.h +++ b/source/extensions/clusters/aggregate/lb_context.h @@ -10,7 +10,7 @@ namespace Aggregate { // AggregateLoadBalancerContext wraps the load balancer context to re-assign priority load // according the to host priority selected by the aggregate load balancer. -class AggregateLoadBalancerContext : public Upstream::LoadBalancerContext { +class AggregateLoadBalancerContext : public Upstream::LoadBalancerContextBase { public: AggregateLoadBalancerContext(Upstream::LoadBalancerContext* context, Upstream::LoadBalancerBase::HostAvailability host_availability, diff --git a/source/extensions/filters/http/ext_proc/config.cc b/source/extensions/filters/http/ext_proc/config.cc index 221ebac44a969..1e8ec8e321c4d 100644 --- a/source/extensions/filters/http/ext_proc/config.cc +++ b/source/extensions/filters/http/ext_proc/config.cc @@ -1,7 +1,5 @@ #include "source/extensions/filters/http/ext_proc/config.h" -#include - #include "source/extensions/filters/http/ext_proc/client_impl.h" #include "source/extensions/filters/http/ext_proc/ext_proc.h" @@ -14,7 +12,7 @@ Http::FilterFactoryCb ExternalProcessingFilterConfig::createFilterFactoryFromPro const envoy::extensions::filters::http::ext_proc::v3alpha::ExternalProcessor& proto_config, const std::string& stats_prefix, Server::Configuration::FactoryContext& context) { const uint32_t message_timeout_ms = - PROTOBUF_GET_MS_OR_DEFAULT(proto_config, message_timeout, kDefaultMessageTimeoutMs); + PROTOBUF_GET_MS_OR_DEFAULT(proto_config, message_timeout, DefaultMessageTimeoutMs); const auto filter_config = std::make_shared( proto_config, std::chrono::milliseconds(message_timeout_ms), context.scope(), stats_prefix); @@ -28,6 +26,13 @@ Http::FilterFactoryCb ExternalProcessingFilterConfig::createFilterFactoryFromPro }; } +Router::RouteSpecificFilterConfigConstSharedPtr +ExternalProcessingFilterConfig::createRouteSpecificFilterConfigTyped( + const envoy::extensions::filters::http::ext_proc::v3alpha::ExtProcPerRoute& proto_config, + Server::Configuration::ServerFactoryContext&, ProtobufMessage::ValidationVisitor&) { + return std::make_shared(proto_config); +} + REGISTER_FACTORY(ExternalProcessingFilterConfig, Server::Configuration::NamedHttpFilterConfigFactory){"envoy.ext_proc"}; diff --git a/source/extensions/filters/http/ext_proc/config.h b/source/extensions/filters/http/ext_proc/config.h index 71e7fa83753a6..9918f341c402a 100644 --- a/source/extensions/filters/http/ext_proc/config.h +++ b/source/extensions/filters/http/ext_proc/config.h @@ -14,17 +14,23 @@ namespace ExternalProcessing { class ExternalProcessingFilterConfig : public Common::FactoryBase< - envoy::extensions::filters::http::ext_proc::v3alpha::ExternalProcessor> { + envoy::extensions::filters::http::ext_proc::v3alpha::ExternalProcessor, + envoy::extensions::filters::http::ext_proc::v3alpha::ExtProcPerRoute> { public: ExternalProcessingFilterConfig() : FactoryBase("envoy.filters.http.ext_proc") {} private: - static constexpr uint64_t kDefaultMessageTimeoutMs = 200; + static constexpr uint64_t DefaultMessageTimeoutMs = 200; Http::FilterFactoryCb createFilterFactoryFromProtoTyped( const envoy::extensions::filters::http::ext_proc::v3alpha::ExternalProcessor& proto_config, const std::string& stats_prefix, Server::Configuration::FactoryContext& context) override; + + Router::RouteSpecificFilterConfigConstSharedPtr createRouteSpecificFilterConfigTyped( + const envoy::extensions::filters::http::ext_proc::v3alpha::ExtProcPerRoute& proto_config, + Server::Configuration::ServerFactoryContext& context, + ProtobufMessage::ValidationVisitor& validator) override; }; } // namespace ExternalProcessing diff --git a/source/extensions/filters/http/ext_proc/ext_proc.cc b/source/extensions/filters/http/ext_proc/ext_proc.cc index 00d22c92a665d..e04a8cf857014 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.cc +++ b/source/extensions/filters/http/ext_proc/ext_proc.cc @@ -1,5 +1,6 @@ #include "source/extensions/filters/http/ext_proc/ext_proc.h" +#include "source/common/http/utility.h" #include "source/extensions/filters/http/ext_proc/mutation_utils.h" #include "absl/strings/str_format.h" @@ -9,6 +10,7 @@ namespace Extensions { namespace HttpFilters { namespace ExternalProcessing { +using envoy::extensions::filters::http::ext_proc::v3alpha::ExtProcPerRoute; using envoy::extensions::filters::http::ext_proc::v3alpha::ProcessingMode; using envoy::service::ext_proc::v3alpha::ImmediateResponse; @@ -23,8 +25,21 @@ using Http::RequestTrailerMap; using Http::ResponseHeaderMap; using Http::ResponseTrailerMap; -static const std::string kErrorPrefix = "ext_proc error"; +static const std::string ErrorPrefix = "ext_proc error"; static const int DefaultImmediateStatus = 200; +static const std::string FilterName = "envoy.filters.http.ext_proc"; + +FilterConfigPerRoute::FilterConfigPerRoute(const ExtProcPerRoute& config) + : disabled_(config.disabled()) { + if (config.has_overrides()) { + processing_mode_ = config.overrides().processing_mode(); + } +} + +void FilterConfigPerRoute::merge(const FilterConfigPerRoute& src) { + disabled_ = src.disabled_; + processing_mode_ = src.processing_mode_; +} void Filter::setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callbacks) { Http::PassThroughFilter::setDecoderFilterCallbacks(callbacks); @@ -91,6 +106,7 @@ FilterHeadersStatus Filter::onHeaders(ProcessorState& state, FilterHeadersStatus Filter::decodeHeaders(RequestHeaderMap& headers, bool end_stream) { ENVOY_LOG(trace, "decodeHeaders: end_stream = {}", end_stream); + mergePerRouteConfig(); if (end_stream) { decoding_state_.setCompleteBodyAvailable(true); } @@ -508,7 +524,7 @@ void Filter::onGrpcError(Grpc::Status::GrpcStatus status) { cleanUpTimers(); ImmediateResponse errorResponse; errorResponse.mutable_status()->set_code(envoy::type::v3::StatusCode::InternalServerError); - errorResponse.set_details(absl::StrFormat("%s: gRPC error %i", kErrorPrefix, status)); + errorResponse.set_details(absl::StrFormat("%s: gRPC error %i", ErrorPrefix, status)); sendImmediateResponse(errorResponse); } } @@ -541,7 +557,7 @@ void Filter::onMessageTimeout() { encoding_state_.setCallbackState(ProcessorState::CallbackState::Idle); ImmediateResponse errorResponse; errorResponse.mutable_status()->set_code(envoy::type::v3::StatusCode::InternalServerError); - errorResponse.set_details(absl::StrFormat("%s: per-message timeout exceeded", kErrorPrefix)); + errorResponse.set_details(absl::StrFormat("%s: per-message timeout exceeded", ErrorPrefix)); sendImmediateResponse(errorResponse); } } @@ -582,6 +598,33 @@ void Filter::sendImmediateResponse(const ImmediateResponse& response) { mutate_headers, grpc_status, response.details()); } +static ProcessingMode allDisabledMode() { + ProcessingMode pm; + pm.set_request_header_mode(ProcessingMode::SKIP); + pm.set_response_header_mode(ProcessingMode::SKIP); + return pm; +} + +void Filter::mergePerRouteConfig() { + auto&& merged_config = Http::Utility::getMergedPerFilterConfig( + FilterName, decoder_callbacks_->route(), + [](FilterConfigPerRoute& dst, const FilterConfigPerRoute& src) { dst.merge(src); }); + if (merged_config) { + if (merged_config->disabled()) { + // Rather than introduce yet another flag, use the processing mode + // structure to disable all the callbacks. + ENVOY_LOG(trace, "Disabling filter due to per-route configuration"); + const auto all_disabled = allDisabledMode(); + decoding_state_.setProcessingMode(all_disabled); + encoding_state_.setProcessingMode(all_disabled); + } else if (merged_config->processingMode()) { + ENVOY_LOG(trace, "Setting new processing mode from per-route configuration"); + decoding_state_.setProcessingMode(*(merged_config->processingMode())); + encoding_state_.setProcessingMode(*(merged_config->processingMode())); + } + } +} + std::string responseCaseToString(const ProcessingResponse::ResponseCase response_case) { switch (response_case) { case ProcessingResponse::ResponseCase::kRequestHeaders: diff --git a/source/extensions/filters/http/ext_proc/ext_proc.h b/source/extensions/filters/http/ext_proc/ext_proc.h index a01732874c74a..ca6c87060dd82 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.h +++ b/source/extensions/filters/http/ext_proc/ext_proc.h @@ -72,6 +72,25 @@ class FilterConfig { using FilterConfigSharedPtr = std::shared_ptr; +class FilterConfigPerRoute : public Router::RouteSpecificFilterConfig { +public: + explicit FilterConfigPerRoute( + const envoy::extensions::filters::http::ext_proc::v3alpha::ExtProcPerRoute& config); + + void merge(const FilterConfigPerRoute& other); + + bool disabled() const { return disabled_; } + const absl::optional& + processingMode() const { + return processing_mode_; + } + +private: + bool disabled_; + absl::optional + processing_mode_; +}; + class Filter : public Logger::Loggable, public Http::PassThroughFilter, public ExternalProcessorCallbacks { @@ -128,6 +147,7 @@ class Filter : public Logger::Loggable, void sendTrailers(ProcessorState& state, const Http::HeaderMap& trailers); private: + void mergePerRouteConfig(); StreamOpenState openStream(); void cleanUpTimers(); diff --git a/source/extensions/filters/network/dubbo_proxy/dubbo_protocol_impl.cc b/source/extensions/filters/network/dubbo_proxy/dubbo_protocol_impl.cc index 01d86199fc64c..52cea791c9150 100644 --- a/source/extensions/filters/network/dubbo_proxy/dubbo_protocol_impl.cc +++ b/source/extensions/filters/network/dubbo_proxy/dubbo_protocol_impl.cc @@ -183,7 +183,11 @@ bool DubboProtocolImpl::encode(Buffer::Instance& buffer, const MessageMetadata& buffer.writeByte(flag); buffer.writeByte(static_cast(metadata.responseStatus())); buffer.writeBEInt(metadata.requestId()); - buffer.writeBEInt(0); + // Body of heart beat response is null. + // TODO(wbpcode): Currently we only support the Hessian2 serialization scheme, so here we + // directly use the 'N' for null object in Hessian2. This coupling should be unnecessary. + buffer.writeBEInt(1u); + buffer.writeByte('N'); return true; } case MessageType::Response: { diff --git a/source/server/filter_chain_manager_impl.cc b/source/server/filter_chain_manager_impl.cc index 01e63097f2c63..f1a74a70e78cd 100644 --- a/source/server/filter_chain_manager_impl.cc +++ b/source/server/filter_chain_manager_impl.cc @@ -31,9 +31,7 @@ Network::Address::InstanceConstSharedPtr fakeAddress() { PerFilterChainFactoryContextImpl::PerFilterChainFactoryContextImpl( Configuration::FactoryContext& parent_context, Init::Manager& init_manager) - : parent_context_(parent_context), scope_(parent_context_.scope().createScope("")), - filter_chain_scope_(parent_context_.listenerScope().createScope("")), - init_manager_(init_manager) {} + : parent_context_(parent_context), init_manager_(init_manager) {} bool PerFilterChainFactoryContextImpl::drainClose() const { return is_draining_.load() || parent_context_.drainDecision().drainClose(); @@ -103,7 +101,7 @@ Envoy::Runtime::Loader& PerFilterChainFactoryContextImpl::runtime() { return parent_context_.runtime(); } -Stats::Scope& PerFilterChainFactoryContextImpl::scope() { return *scope_; } +Stats::Scope& PerFilterChainFactoryContextImpl::scope() { return parent_context_.scope(); } Singleton::Manager& PerFilterChainFactoryContextImpl::singletonManager() { return parent_context_.singletonManager(); @@ -137,7 +135,9 @@ PerFilterChainFactoryContextImpl::getTransportSocketFactoryContext() const { return parent_context_.getTransportSocketFactoryContext(); } -Stats::Scope& PerFilterChainFactoryContextImpl::listenerScope() { return *filter_chain_scope_; } +Stats::Scope& PerFilterChainFactoryContextImpl::listenerScope() { + return parent_context_.listenerScope(); +} bool PerFilterChainFactoryContextImpl::isQuicListener() const { return parent_context_.isQuicListener(); diff --git a/source/server/filter_chain_manager_impl.h b/source/server/filter_chain_manager_impl.h index 3919aeb81073d..8218e89777127 100644 --- a/source/server/filter_chain_manager_impl.h +++ b/source/server/filter_chain_manager_impl.h @@ -87,10 +87,6 @@ class PerFilterChainFactoryContextImpl : public Configuration::FilterChainFactor private: Configuration::FactoryContext& parent_context_; - // The scope that has empty prefix. - Stats::ScopePtr scope_; - // filter_chain_scope_ has the same prefix as listener owners scope. - Stats::ScopePtr filter_chain_scope_; Init::Manager& init_manager_; std::atomic is_draining_{false}; }; diff --git a/test/common/buffer/buffer_memory_account_test.cc b/test/common/buffer/buffer_memory_account_test.cc index a651eca14dffa..b6b6a20193717 100644 --- a/test/common/buffer/buffer_memory_account_test.cc +++ b/test/common/buffer/buffer_memory_account_test.cc @@ -22,6 +22,7 @@ using MemoryClassesToAccountsSet = std::array()), + account_(factory.createAccount(*reset_handler_)) {} + + void expectResetStream() { + EXPECT_CALL(*reset_handler_, resetStream(_)).WillOnce([this](Http::StreamResetReason) { + account_->credit(getBalance(account_)); + account_->clearDownstream(); + reset_handler_invoked_ = true; + }); + } + + std::unique_ptr reset_handler_; + bool reset_handler_invoked_{false}; + BufferMemoryAccountSharedPtr account_; +}; + +using AccountWithResetHandlerPtr = std::unique_ptr; + +TEST(WatermarkBufferFactoryTest, + LimitsNumberOfStreamsResetPerInvocationOfResetAccountsGivenPressure) { + TrackedWatermarkBufferFactory factory(absl::bit_width(kMinimumBalanceToTrack)); + + std::vector accounts_to_reset; + for (int i = 0; i < 2 * kMaxStreamsResetPerCall; ++i) { + accounts_to_reset.push_back(std::make_unique(factory)); + accounts_to_reset.back()->account_->charge(kThresholdForFinalBucket); + accounts_to_reset.back()->expectResetStream(); + } + + // Assert accounts tracked. + factory.inspectMemoryClasses([](MemoryClassesToAccountsSet& memory_classes_to_account) { + ASSERT_EQ(memory_classes_to_account[BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_ - 1].size(), + 2 * kMaxStreamsResetPerCall); + }); + + // We should only reset up to the max number of streams that should be reset. + int streams_reset = 0; + EXPECT_EQ(factory.resetAccountsGivenPressure(1.0), kMaxStreamsResetPerCall); + for (const auto& account : accounts_to_reset) { + if (account->reset_handler_invoked_) { + ++streams_reset; + } + } + + EXPECT_EQ(streams_reset, kMaxStreamsResetPerCall); + + // Subsequent call to reset the remaining streams. + EXPECT_EQ(factory.resetAccountsGivenPressure(1.0), kMaxStreamsResetPerCall); + for (const auto& account : accounts_to_reset) { + EXPECT_TRUE(account->reset_handler_invoked_); + } +} + +// Tests that of the eligible streams to reset, we start resetting the largest +// streams. +TEST(WatermarkBufferFactoryTest, + ShouldPrioritizeResettingTheLargestEligibleStreamsPerInvocationOfResetAccountGivenPressure) { + TrackedWatermarkBufferFactory factory(absl::bit_width(kMinimumBalanceToTrack)); + + std::vector accounts_reset_in_first_batch; + for (int i = 0; i < kMaxStreamsResetPerCall; ++i) { + accounts_reset_in_first_batch.push_back(std::make_unique(factory)); + accounts_reset_in_first_batch.back()->account_->charge(kThresholdForFinalBucket); + accounts_reset_in_first_batch.back()->expectResetStream(); + } + + std::vector accounts_reset_in_second_batch; + for (int i = 0; i < kMaxStreamsResetPerCall; ++i) { + accounts_reset_in_second_batch.push_back(std::make_unique(factory)); + accounts_reset_in_second_batch.back()->account_->charge(kMinimumBalanceToTrack); + } + + // Assert accounts tracked. + factory.inspectMemoryClasses([](MemoryClassesToAccountsSet& memory_classes_to_account) { + ASSERT_EQ(memory_classes_to_account[0].size(), kMaxStreamsResetPerCall); + ASSERT_EQ(memory_classes_to_account[BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_ - 1].size(), + kMaxStreamsResetPerCall); + }); + + // All buckets are eligible for having streams reset given the pressure. + // However we will hit the maximum number to reset per call and shouldn't + // have any in the second batch reset. + EXPECT_EQ(factory.resetAccountsGivenPressure(1.0), kMaxStreamsResetPerCall); + for (int i = 0; i < kMaxStreamsResetPerCall; ++i) { + EXPECT_TRUE(accounts_reset_in_first_batch[i]->reset_handler_invoked_); + EXPECT_FALSE(accounts_reset_in_second_batch[i]->reset_handler_invoked_); + } + + // Subsequent call should get those in the second batch. + for (int i = 0; i < kMaxStreamsResetPerCall; ++i) { + accounts_reset_in_second_batch[i]->expectResetStream(); + } + EXPECT_EQ(factory.resetAccountsGivenPressure(1.0), kMaxStreamsResetPerCall); + for (int i = 0; i < kMaxStreamsResetPerCall; ++i) { + EXPECT_TRUE(accounts_reset_in_second_batch[i]->reset_handler_invoked_); + } +} + } // namespace } // namespace Buffer } // namespace Envoy diff --git a/test/common/conn_pool/conn_pool_base_test.cc b/test/common/conn_pool/conn_pool_base_test.cc index 869dafa68a678..17dcfe7e8473a 100644 --- a/test/common/conn_pool/conn_pool_base_test.cc +++ b/test/common/conn_pool/conn_pool_base_test.cc @@ -227,7 +227,7 @@ TEST_F(ConnPoolImplBaseTest, PoolIdleCallbackTriggeredRemoteClose) { clients_.back()->onEvent(Network::ConnectionEvent::RemoteClose); EXPECT_CALL(idle_pool_callback, Call()); - pool_.startDrainImpl(); + pool_.drainConnectionsImpl(Envoy::ConnectionPool::DrainBehavior::DrainAndDelete); } // Local close simulates what would happen for an idle timeout on a connection. @@ -255,7 +255,7 @@ TEST_F(ConnPoolImplBaseTest, PoolIdleCallbackTriggeredLocalClose) { clients_.back()->onEvent(Network::ConnectionEvent::LocalClose); EXPECT_CALL(idle_pool_callback, Call()); - pool_.startDrainImpl(); + pool_.drainConnectionsImpl(Envoy::ConnectionPool::DrainBehavior::DrainAndDelete); } } // namespace ConnectionPool diff --git a/test/common/http/conn_pool_grid_test.cc b/test/common/http/conn_pool_grid_test.cc index 908e82bf1282b..873a813be4501 100644 --- a/test/common/http/conn_pool_grid_test.cc +++ b/test/common/http/conn_pool_grid_test.cc @@ -382,20 +382,23 @@ TEST_F(ConnectivityGridTest, TestCancel) { // Make sure drains get sent to all active pools. TEST_F(ConnectivityGridTest, Drain) { - grid_.drainConnections(); + grid_.drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections); // Synthetically create a pool. grid_.createNextPool(); { - EXPECT_CALL(*grid_.first(), drainConnections()); - grid_.drainConnections(); + EXPECT_CALL(*grid_.first(), + drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections)); + grid_.drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections); } grid_.createNextPool(); { - EXPECT_CALL(*grid_.first(), drainConnections()); - EXPECT_CALL(*grid_.second(), drainConnections()); - grid_.drainConnections(); + EXPECT_CALL(*grid_.first(), + drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections)); + EXPECT_CALL(*grid_.second(), + drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections)); + grid_.drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections); } } @@ -411,16 +414,22 @@ TEST_F(ConnectivityGridTest, DrainCallbacks) { // The first time a drain is started, both pools should start draining. { - EXPECT_CALL(*grid_.first(), startDrain()); - EXPECT_CALL(*grid_.second(), startDrain()); - grid_.startDrain(); + EXPECT_CALL(*grid_.first(), + drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainAndDelete)); + EXPECT_CALL(*grid_.second(), + drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainAndDelete)); + grid_.drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainAndDelete); } // The second time, the pools will not see any change. { - EXPECT_CALL(*grid_.first(), startDrain()).Times(0); - EXPECT_CALL(*grid_.second(), startDrain()).Times(0); - grid_.startDrain(); + EXPECT_CALL(*grid_.first(), + drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainAndDelete)) + .Times(0); + EXPECT_CALL(*grid_.second(), + drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainAndDelete)) + .Times(0); + grid_.drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainAndDelete); } { // Notify the grid the second pool has been drained. This should not be @@ -481,7 +490,7 @@ TEST_F(ConnectivityGridTest, NoDrainOnTeardown) { { grid_.addIdleCallback([&drain_received]() -> void { drain_received = true; }); - grid_.startDrain(); + grid_.drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainAndDelete); } grid_.setDestroying(); // Fake being in the destructor. diff --git a/test/common/http/http1/conn_pool_test.cc b/test/common/http/http1/conn_pool_test.cc index f603967b14c97..75b124986a808 100644 --- a/test/common/http/http1/conn_pool_test.cc +++ b/test/common/http/http1/conn_pool_test.cc @@ -261,7 +261,7 @@ TEST_F(Http1ConnPoolImplTest, DrainConnections) { r1.completeResponse(false); // This will destroy the ready client and set requests remaining to 1 on the busy client. - conn_pool_->drainConnections(); + conn_pool_->drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections); EXPECT_CALL(*conn_pool_, onClientDestroy()); dispatcher_.clearDeferredDeleteList(); conn_pool_->expectAndRunUpstreamReady(); @@ -951,7 +951,7 @@ TEST_F(Http1ConnPoolImplTest, DrainCallback) { ActiveTestRequest r2(*this, 0, ActiveTestRequest::Type::Pending); conn_pool_->addIdleCallback([&]() -> void { drained.ready(); }); - conn_pool_->startDrain(); + conn_pool_->drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainAndDelete); r2.handle_->cancel(Envoy::ConnectionPool::CancelPolicy::Default); EXPECT_EQ(1U, cluster_->stats_.upstream_rq_total_.value()); @@ -978,7 +978,7 @@ TEST_F(Http1ConnPoolImplTest, DrainWhileConnecting) { EXPECT_NE(nullptr, handle); conn_pool_->addIdleCallback([&]() -> void { drained.ready(); }); - conn_pool_->startDrain(); + conn_pool_->drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainAndDelete); EXPECT_CALL(*conn_pool_->test_clients_[0].connection_, close(Network::ConnectionCloseType::NoFlush)); EXPECT_CALL(drained, ready()).Times(AtLeast(1)); @@ -1046,7 +1046,7 @@ TEST_F(Http1ConnPoolImplTest, ActiveRequestHasActiveConnectionsTrue) { // cleanup conn_pool_->expectEnableUpstreamReady(); r1.completeResponse(false); - conn_pool_->drainConnections(); + conn_pool_->drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections); EXPECT_CALL(*conn_pool_, onClientDestroy()); dispatcher_.clearDeferredDeleteList(); conn_pool_->expectAndRunUpstreamReady(); @@ -1060,7 +1060,7 @@ TEST_F(Http1ConnPoolImplTest, ResponseCompletedConnectionReadyNoActiveConnection EXPECT_FALSE(conn_pool_->hasActiveConnections()); - conn_pool_->drainConnections(); + conn_pool_->drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections); EXPECT_CALL(*conn_pool_, onClientDestroy()); dispatcher_.clearDeferredDeleteList(); conn_pool_->expectAndRunUpstreamReady(); @@ -1075,7 +1075,7 @@ TEST_F(Http1ConnPoolImplTest, PendingRequestIsConsideredActive) { EXPECT_CALL(*conn_pool_, onClientDestroy()); r1.handle_->cancel(Envoy::ConnectionPool::CancelPolicy::Default); EXPECT_EQ(0U, cluster_->stats_.upstream_rq_total_.value()); - conn_pool_->drainConnections(); + conn_pool_->drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections); conn_pool_->test_clients_[0].connection_->raiseEvent(Network::ConnectionEvent::RemoteClose); dispatcher_.clearDeferredDeleteList(); diff --git a/test/common/http/http2/conn_pool_test.cc b/test/common/http/http2/conn_pool_test.cc index 4f1b8e6f4bbec..7a1c272a1796c 100644 --- a/test/common/http/http2/conn_pool_test.cc +++ b/test/common/http/http2/conn_pool_test.cc @@ -320,7 +320,7 @@ TEST_F(Http2ConnPoolImplTest, DrainConnectionIdle) { completeRequest(r); EXPECT_CALL(*this, onClientDestroy()); - pool_->drainConnections(); + pool_->drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections); } /** @@ -389,7 +389,7 @@ TEST_F(Http2ConnPoolImplTest, DrainConnectionReadyWithRequest) { ->encodeHeaders(TestRequestHeaderMapImpl{{":path", "/"}, {":method", "GET"}}, true) .ok()); - pool_->drainConnections(); + pool_->drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections); EXPECT_CALL(r.decoder_, decodeHeaders_(_, true)); EXPECT_CALL(*this, onClientDestroy()); @@ -414,7 +414,7 @@ TEST_F(Http2ConnPoolImplTest, DrainConnectionBusy) { ->encodeHeaders(TestRequestHeaderMapImpl{{":path", "/"}, {":method", "GET"}}, true) .ok()); - pool_->drainConnections(); + pool_->drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections); EXPECT_CALL(r.decoder_, decodeHeaders_(_, true)); EXPECT_CALL(*this, onClientDestroy()); @@ -434,12 +434,12 @@ TEST_F(Http2ConnPoolImplTest, DrainConnectionConnecting) { ActiveTestRequest r(*this, 0, false); // Pending request prevents the connection from being drained - pool_->drainConnections(); + pool_->drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections); // Cancel the pending request, and then the connection can be closed. r.handle_->cancel(Envoy::ConnectionPool::CancelPolicy::Default); EXPECT_CALL(*this, onClientDestroy()); - pool_->drainConnections(); + pool_->drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections); } /** @@ -452,7 +452,7 @@ TEST_F(Http2ConnPoolImplTest, CloseExcess) { ActiveTestRequest r(*this, 0, false); // Pending request prevents the connection from being drained - pool_->drainConnections(); + pool_->drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections); EXPECT_CALL(*this, onClientDestroy()); r.handle_->cancel(Envoy::ConnectionPool::CancelPolicy::CloseExcess); @@ -593,7 +593,7 @@ TEST_F(Http2ConnPoolImplTest, DrainConnections) { cluster_->max_requests_per_connection_ = 1; // Test drain connections call prior to any connections being created. - pool_->drainConnections(); + pool_->drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections); expectClientCreate(); ActiveTestRequest r1(*this, 0, false); @@ -616,7 +616,7 @@ TEST_F(Http2ConnPoolImplTest, DrainConnections) { .ok()); // This will move the second connection to draining. - pool_->drainConnections(); + pool_->drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections); // This will destroy the 2 draining connections. test_clients_[0].connection_->raiseEvent(Network::ConnectionEvent::RemoteClose); @@ -1091,7 +1091,7 @@ TEST_F(Http2ConnPoolImplTest, DrainDisconnectWithActiveRequest) { .ok()); ReadyWatcher drained; pool_->addIdleCallback([&]() -> void { drained.ready(); }); - pool_->startDrain(); + pool_->drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainAndDelete); EXPECT_CALL(dispatcher_, deferredDelete_(_)); EXPECT_CALL(drained, ready()); @@ -1128,7 +1128,7 @@ TEST_F(Http2ConnPoolImplTest, DrainDisconnectDrainingWithActiveRequest) { ReadyWatcher drained; pool_->addIdleCallback([&]() -> void { drained.ready(); }); - pool_->startDrain(); + pool_->drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainAndDelete); EXPECT_CALL(dispatcher_, deferredDelete_(_)); EXPECT_CALL(r2.decoder_, decodeHeaders_(_, true)); @@ -1172,7 +1172,7 @@ TEST_F(Http2ConnPoolImplTest, DrainPrimary) { ReadyWatcher drained; pool_->addIdleCallback([&]() -> void { drained.ready(); }); - pool_->startDrain(); + pool_->drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainAndDelete); EXPECT_CALL(dispatcher_, deferredDelete_(_)); EXPECT_CALL(r2.decoder_, decodeHeaders_(_, true)); @@ -1228,7 +1228,7 @@ TEST_F(Http2ConnPoolImplTest, DrainPrimaryNoActiveRequest) { ReadyWatcher drained; EXPECT_CALL(drained, ready()); pool_->addIdleCallback([&]() -> void { drained.ready(); }); - pool_->startDrain(); + pool_->drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainAndDelete); EXPECT_CALL(*this, onClientDestroy()); dispatcher_.clearDeferredDeleteList(); @@ -1382,7 +1382,7 @@ TEST_F(Http2ConnPoolImplTest, DrainingConnectionsConsideredActive) { expectClientCreate(); ActiveTestRequest r1(*this, 0, false); expectClientConnect(0, r1); - pool_->drainConnections(); + pool_->drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections); EXPECT_TRUE(pool_->hasActiveConnections()); @@ -1396,7 +1396,7 @@ TEST_F(Http2ConnPoolImplTest, DrainedConnectionsNotActive) { expectClientCreate(); ActiveTestRequest r1(*this, 0, false); expectClientConnect(0, r1); - pool_->drainConnections(); + pool_->drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections); completeRequest(r1); EXPECT_FALSE(pool_->hasActiveConnections()); @@ -1429,7 +1429,7 @@ TEST_F(Http2ConnPoolImplTest, PreconnectWithoutMultiplexing) { r1.handle_->cancel(Envoy::ConnectionPool::CancelPolicy::CloseExcess); r2.handle_->cancel(Envoy::ConnectionPool::CancelPolicy::CloseExcess); r3.handle_->cancel(Envoy::ConnectionPool::CancelPolicy::CloseExcess); - pool_->drainConnections(); + pool_->drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections); closeAllClients(); } @@ -1483,7 +1483,7 @@ TEST_F(Http2ConnPoolImplTest, PreconnectWithMultiplexing) { // Clean up. r1.handle_->cancel(Envoy::ConnectionPool::CancelPolicy::CloseExcess); r2.handle_->cancel(Envoy::ConnectionPool::CancelPolicy::CloseExcess); - pool_->drainConnections(); + pool_->drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections); closeAllClients(); } @@ -1527,7 +1527,7 @@ TEST_F(Http2ConnPoolImplTest, PreconnectWithSettings) { CHECK_STATE(2 /*active*/, 0 /*pending*/, 0 /*capacity*/); // Clean up. - pool_->drainConnections(); + pool_->drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections); closeAllClients(); } @@ -1553,7 +1553,7 @@ TEST_F(Http2ConnPoolImplTest, PreconnectWithGoaway) { CHECK_STATE(1 /*active*/, 0 /*pending*/, 0 /*capacity*/); // Clean up. - pool_->drainConnections(); + pool_->drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections); closeAllClients(); } @@ -1579,7 +1579,7 @@ TEST_F(Http2ConnPoolImplTest, PreconnectEvenWhenReady) { // Clean up. completeRequest(r1); completeRequest(r2); - pool_->drainConnections(); + pool_->drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections); closeAllClients(); } @@ -1600,7 +1600,7 @@ TEST_F(Http2ConnPoolImplTest, PreconnectAfterTimeout) { // Clean up. completeRequest(r1); - pool_->drainConnections(); + pool_->drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections); closeAllClients(); } @@ -1625,7 +1625,7 @@ TEST_F(Http2ConnPoolImplTest, CloseExcessWithPreconnect) { // Clean up. r1.handle_->cancel(Envoy::ConnectionPool::CancelPolicy::CloseExcess); - pool_->drainConnections(); + pool_->drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections); closeAllClients(); } @@ -1638,7 +1638,7 @@ TEST_F(Http2ConnPoolImplTest, MaybePreconnect) { expectClientsCreate(1); EXPECT_TRUE(pool_->maybePreconnect(2)); - pool_->drainConnections(); + pool_->drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections); closeAllClients(); } @@ -1675,7 +1675,7 @@ TEST_F(Http2ConnPoolImplTest, TestStateWithMultiplexing) { CHECK_STATE(0 /*active*/, 0 /*pending*/, 1 /*capacity*/); // Clean up with an outstanding stream. - pool_->drainConnections(); + pool_->drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections); closeAllClients(); CHECK_STATE(0 /*active*/, 0 /*pending*/, 0 /*capacity*/); } diff --git a/test/common/http/mixed_conn_pool_test.cc b/test/common/http/mixed_conn_pool_test.cc index 3f22bfc5b876c..90b0fc2cee33d 100644 --- a/test/common/http/mixed_conn_pool_test.cc +++ b/test/common/http/mixed_conn_pool_test.cc @@ -86,7 +86,7 @@ void MixedConnPoolImplTest::testAlpnHandshake(absl::optional protocol) EXPECT_EQ(protocol.value(), conn_pool_->protocol()); } - conn_pool_->drainConnections(); + conn_pool_->drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections); connection->raiseEvent(Network::ConnectionEvent::RemoteClose); dispatcher_.clearDeferredDeleteList(); conn_pool_.reset(); diff --git a/test/common/tcp/conn_pool_test.cc b/test/common/tcp/conn_pool_test.cc index de4f489e05f46..04178b7207b38 100644 --- a/test/common/tcp/conn_pool_test.cc +++ b/test/common/tcp/conn_pool_test.cc @@ -107,8 +107,9 @@ class ConnPoolBase : public Tcp::ConnectionPool::Instance { void addIdleCallback(IdleCb cb) override { conn_pool_->addIdleCallback(cb); } bool isIdle() const override { return conn_pool_->isIdle(); } - void startDrain() override { return conn_pool_->startDrain(); } - void drainConnections() override { conn_pool_->drainConnections(); } + void drainConnections(Envoy::ConnectionPool::DrainBehavior drain_behavior) override { + conn_pool_->drainConnections(drain_behavior); + } void closeConnections() override { conn_pool_->closeConnections(); } ConnectionPool::Cancellable* newConnection(Tcp::ConnectionPool::Callbacks& callbacks) override { return conn_pool_->newConnection(callbacks); @@ -442,7 +443,7 @@ TEST_P(TcpConnPoolImplTest, DrainConnections) { // This will destroy the ready connection and set requests remaining to 1 on the busy and // pending connections. EXPECT_CALL(*conn_pool_, onConnDestroyedForTest()); - conn_pool_->drainConnections(); + conn_pool_->drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections); dispatcher_.clearDeferredDeleteList(); } { @@ -975,12 +976,11 @@ TEST_P(TcpConnPoolImplTest, ConnectionStateWithConcurrentConnections) { TEST_P(TcpConnPoolImplTest, DrainCallback) { initialize(); ReadyWatcher drained; - EXPECT_CALL(drained, ready()); conn_pool_->addIdleCallback([&]() -> void { drained.ready(); }); - conn_pool_->startDrain(); ActiveTestConn c1(*this, 0, ActiveTestConn::Type::CreateConnection); ActiveTestConn c2(*this, 0, ActiveTestConn::Type::Pending); + conn_pool_->drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainAndDelete); c2.handle_->cancel(ConnectionPool::CancelPolicy::Default); EXPECT_CALL(*conn_pool_, onConnReleasedForTest()); @@ -1005,7 +1005,7 @@ TEST_P(TcpConnPoolImplTest, DrainWhileConnecting) { EXPECT_NE(nullptr, handle); conn_pool_->addIdleCallback([&]() -> void { drained.ready(); }); - conn_pool_->startDrain(); + conn_pool_->drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainAndDelete); if (test_new_connection_pool_) { // The shared connection pool removes and closes connecting clients if there are no @@ -1029,12 +1029,11 @@ TEST_P(TcpConnPoolImplTest, DrainWhileConnecting) { TEST_P(TcpConnPoolImplTest, DrainOnClose) { initialize(); ReadyWatcher drained; - EXPECT_CALL(drained, ready()); conn_pool_->addIdleCallback([&]() -> void { drained.ready(); }); - conn_pool_->startDrain(); - ActiveTestConn c1(*this, 0, ActiveTestConn::Type::CreateConnection); + conn_pool_->drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainAndDelete); + EXPECT_CALL(drained, ready()).Times(AtLeast(1)); EXPECT_CALL(c1.callbacks_.callbacks_, onEvent(Network::ConnectionEvent::RemoteClose)) .WillOnce(Invoke([&](Network::ConnectionEvent event) -> void { @@ -1077,7 +1076,7 @@ TEST_P(TcpConnPoolImplTest, RequestCapacity) { // This should set the number of requests remaining to 1 on the active // connections, and the connecting_request_capacity to 2 as well. - conn_pool_->drainConnections(); + conn_pool_->drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections); // Cancel the connections. Because neither used CloseExcess, the two connections should persist. handle1->cancel(ConnectionPool::CancelPolicy::Default); @@ -1130,7 +1129,7 @@ TEST_P(TcpConnPoolImplTest, TestIdleTimeout) { testing::MockFunction drained_callback; EXPECT_CALL(idle_callback, Call()); - conn_pool_->startDrain(); + conn_pool_->drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainAndDelete); EXPECT_CALL(*conn_pool_, onConnDestroyedForTest()); dispatcher_.clearDeferredDeleteList(); } diff --git a/test/common/upstream/BUILD b/test/common/upstream/BUILD index a0a36ca636bae..684e16be97e75 100644 --- a/test/common/upstream/BUILD +++ b/test/common/upstream/BUILD @@ -277,6 +277,7 @@ envoy_cc_test( "//test/mocks:common_lib", "//test/mocks/runtime:runtime_mocks", "//test/mocks/upstream:cluster_info_mocks", + "//test/mocks/upstream:host_mocks", "//test/mocks/upstream:host_set_mocks", "//test/mocks/upstream:load_balancer_context_mock", "//test/mocks/upstream:priority_set_mocks", @@ -520,7 +521,9 @@ envoy_cc_test( "//test/mocks:common_lib", "//test/mocks/runtime:runtime_mocks", "//test/mocks/upstream:cluster_info_mocks", + "//test/mocks/upstream:host_mocks", "//test/mocks/upstream:host_set_mocks", + "//test/mocks/upstream:load_balancer_context_mock", "//test/mocks/upstream:priority_set_mocks", "//test/test_common:simulated_time_system_lib", "@envoy_api//envoy/config/cluster/v3:pkg_cc_proto", @@ -535,7 +538,9 @@ envoy_cc_test( "//source/common/upstream:maglev_lb_lib", "//test/mocks:common_lib", "//test/mocks/upstream:cluster_info_mocks", + "//test/mocks/upstream:host_mocks", "//test/mocks/upstream:host_set_mocks", + "//test/mocks/upstream:load_balancer_context_mock", "//test/mocks/upstream:priority_set_mocks", "//test/test_common:simulated_time_system_lib", "@envoy_api//envoy/config/cluster/v3:pkg_cc_proto", @@ -599,6 +604,7 @@ envoy_cc_test( "//test/mocks/upstream:cluster_info_mocks", "//test/mocks/upstream:host_mocks", "//test/mocks/upstream:host_set_mocks", + "//test/mocks/upstream:load_balancer_context_mock", "//test/mocks/upstream:load_balancer_mocks", "//test/mocks/upstream:priority_set_mocks", "//test/test_common:simulated_time_system_lib", diff --git a/test/common/upstream/cluster_manager_impl_test.cc b/test/common/upstream/cluster_manager_impl_test.cc index b8f8003ce34f7..86f133ecef107 100644 --- a/test/common/upstream/cluster_manager_impl_test.cc +++ b/test/common/upstream/cluster_manager_impl_test.cc @@ -1769,8 +1769,8 @@ TEST_F(ClusterManagerImplTest, DynamicAddRemove) { // Now remove the cluster. This should drain the connection pools, but not affect // tcp connections. EXPECT_CALL(*callbacks, onClusterRemoval(_)); - EXPECT_CALL(*cp, startDrain()); - EXPECT_CALL(*cp2, startDrain()); + EXPECT_CALL(*cp, drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainAndDelete)); + EXPECT_CALL(*cp2, drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainAndDelete)); EXPECT_TRUE(cluster_manager_->removeCluster("fake_cluster")); EXPECT_EQ(nullptr, cluster_manager_->getThreadLocalCluster("fake_cluster")); EXPECT_EQ(0UL, cluster_manager_->clusters().active_clusters_.size()); @@ -1909,7 +1909,8 @@ TEST_F(ClusterManagerImplTest, CloseHttpConnectionsOnHealthFailure) { outlier_detector.runCallbacks(test_host); health_checker.runCallbacks(test_host, HealthTransition::Unchanged); - EXPECT_CALL(*cp1, drainConnections()); + EXPECT_CALL(*cp1, + drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections)); test_host->healthFlagSet(Host::HealthFlag::FAILED_OUTLIER_CHECK); outlier_detector.runCallbacks(test_host); @@ -1919,8 +1920,10 @@ TEST_F(ClusterManagerImplTest, CloseHttpConnectionsOnHealthFailure) { } // Order of these calls is implementation dependent, so can't sequence them! - EXPECT_CALL(*cp1, drainConnections()); - EXPECT_CALL(*cp2, drainConnections()); + EXPECT_CALL(*cp1, + drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections)); + EXPECT_CALL(*cp2, + drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections)); test_host->healthFlagSet(Host::HealthFlag::FAILED_ACTIVE_HC); health_checker.runCallbacks(test_host, HealthTransition::Changed); @@ -1973,7 +1976,9 @@ TEST_F(ClusterManagerImplTest, CloseHttpConnectionsAndDeletePoolOnHealthFailure) outlier_detector.runCallbacks(test_host); health_checker.runCallbacks(test_host, HealthTransition::Unchanged); - EXPECT_CALL(*cp1, drainConnections()).WillOnce(Invoke([&]() { cp1->idle_cb_(); })); + EXPECT_CALL(*cp1, + drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections)) + .WillOnce(Invoke([&]() { cp1->idle_cb_(); })); test_host->healthFlagSet(Host::HealthFlag::FAILED_OUTLIER_CHECK); outlier_detector.runCallbacks(test_host); @@ -2020,7 +2025,8 @@ TEST_F(ClusterManagerImplTest, CloseTcpConnectionPoolsOnHealthFailure) { outlier_detector.runCallbacks(test_host); health_checker.runCallbacks(test_host, HealthTransition::Unchanged); - EXPECT_CALL(*cp1, drainConnections()); + EXPECT_CALL(*cp1, + drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections)); test_host->healthFlagSet(Host::HealthFlag::FAILED_OUTLIER_CHECK); outlier_detector.runCallbacks(test_host); @@ -2030,8 +2036,10 @@ TEST_F(ClusterManagerImplTest, CloseTcpConnectionPoolsOnHealthFailure) { } // Order of these calls is implementation dependent, so can't sequence them! - EXPECT_CALL(*cp1, drainConnections()); - EXPECT_CALL(*cp2, drainConnections()); + EXPECT_CALL(*cp1, + drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections)); + EXPECT_CALL(*cp2, + drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections)); test_host->healthFlagSet(Host::HealthFlag::FAILED_ACTIVE_HC); health_checker.runCallbacks(test_host, HealthTransition::Changed); @@ -2956,14 +2964,16 @@ TEST_F(ClusterManagerImplTest, DynamicHostRemoveDefaultPriority) { ->tcpConnPool(ResourcePriority::Default, nullptr)); // Immediate drain, since this can happen with the HTTP codecs. - EXPECT_CALL(*cp, startDrain()).WillOnce(Invoke([&]() { - cp->idle_cb_(); - cp->idle_cb_ = nullptr; - })); - EXPECT_CALL(*tcp, startDrain()).WillOnce(Invoke([&]() { - tcp->idle_cb_(); - tcp->idle_cb_ = nullptr; - })); + EXPECT_CALL(*cp, drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainAndDelete)) + .WillOnce(Invoke([&]() { + cp->idle_cb_(); + cp->idle_cb_ = nullptr; + })); + EXPECT_CALL(*tcp, drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainAndDelete)) + .WillOnce(Invoke([&]() { + tcp->idle_cb_(); + tcp->idle_cb_ = nullptr; + })); // Remove the first host, this should lead to the cp being drained, without // crash. @@ -3035,13 +3045,13 @@ TEST_F(ClusterManagerImplTest, ConnPoolDestroyWithDraining) { Http::ConnectionPool::Instance::IdleCb drained_cb; EXPECT_CALL(factory_, allocateConnPool_(_, _, _, _, _)).WillOnce(Return(mock_cp)); EXPECT_CALL(*mock_cp, addIdleCallback(_)).WillOnce(SaveArg<0>(&drained_cb)); - EXPECT_CALL(*mock_cp, startDrain()); + EXPECT_CALL(*mock_cp, drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainAndDelete)); MockTcpConnPoolWithDestroy* mock_tcp = new NiceMock(); Tcp::ConnectionPool::Instance::IdleCb tcp_drained_cb; EXPECT_CALL(factory_, allocateTcpConnPool_).WillOnce(Return(mock_tcp)); EXPECT_CALL(*mock_tcp, addIdleCallback(_)).WillOnce(SaveArg<0>(&tcp_drained_cb)); - EXPECT_CALL(*mock_tcp, startDrain()); + EXPECT_CALL(*mock_tcp, drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainAndDelete)); HttpPoolDataPeer::getPool( cluster_manager_->getThreadLocalCluster("cluster_1") @@ -4603,22 +4613,26 @@ TEST_F(ClusterManagerImplTest, ConnPoolsDrainedOnHostSetChange) { EXPECT_NE(cp1, cp2); EXPECT_NE(tcp1, tcp2); - EXPECT_CALL(*cp2, startDrain()).WillOnce(Invoke([&]() { - cp2->idle_cb_(); - cp2->idle_cb_ = nullptr; - })); - EXPECT_CALL(*cp1, startDrain()).WillOnce(Invoke([&]() { - cp1->idle_cb_(); - cp1->idle_cb_ = nullptr; - })); - EXPECT_CALL(*tcp1, startDrain()).WillOnce(Invoke([&]() { - tcp1->idle_cb_(); - tcp1->idle_cb_ = nullptr; - })); - EXPECT_CALL(*tcp2, startDrain()).WillOnce(Invoke([&]() { - tcp2->idle_cb_(); - tcp2->idle_cb_ = nullptr; - })); + EXPECT_CALL(*cp2, drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainAndDelete)) + .WillOnce(Invoke([&]() { + cp2->idle_cb_(); + cp2->idle_cb_ = nullptr; + })); + EXPECT_CALL(*cp1, drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainAndDelete)) + .WillOnce(Invoke([&]() { + cp1->idle_cb_(); + cp1->idle_cb_ = nullptr; + })); + EXPECT_CALL(*tcp1, drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainAndDelete)) + .WillOnce(Invoke([&]() { + tcp1->idle_cb_(); + tcp1->idle_cb_ = nullptr; + })); + EXPECT_CALL(*tcp2, drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainAndDelete)) + .WillOnce(Invoke([&]() { + tcp2->idle_cb_(); + tcp2->idle_cb_ = nullptr; + })); HostVector hosts_removed; hosts_removed.push_back(host2); @@ -4641,14 +4655,16 @@ TEST_F(ClusterManagerImplTest, ConnPoolsDrainedOnHostSetChange) { HostVector hosts_added; hosts_added.push_back(host3); - EXPECT_CALL(*cp1, startDrain()).WillOnce(Invoke([&]() { - cp1->idle_cb_(); - cp1->idle_cb_ = nullptr; - })); - EXPECT_CALL(*tcp1, startDrain()).WillOnce(Invoke([&]() { - tcp1->idle_cb_(); - tcp1->idle_cb_ = nullptr; - })); + EXPECT_CALL(*cp1, drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainAndDelete)) + .WillOnce(Invoke([&]() { + cp1->idle_cb_(); + cp1->idle_cb_ = nullptr; + })); + EXPECT_CALL(*tcp1, drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainAndDelete)) + .WillOnce(Invoke([&]() { + tcp1->idle_cb_(); + tcp1->idle_cb_ = nullptr; + })); // Adding host3 should drain connection pool for host1. cluster.prioritySet().updateHosts( @@ -4712,8 +4728,12 @@ TEST_F(ClusterManagerImplTest, ConnPoolsNotDrainedOnHostSetChange) { hosts_added.push_back(host2); // No connection pools should be drained. - EXPECT_CALL(*cp1, drainConnections()).Times(0); - EXPECT_CALL(*tcp1, drainConnections()).Times(0); + EXPECT_CALL(*cp1, + drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections)) + .Times(0); + EXPECT_CALL(*tcp1, + drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections)) + .Times(0); // No connection pools should be drained. cluster.prioritySet().updateHosts( diff --git a/test/common/upstream/conn_pool_map_impl_test.cc b/test/common/upstream/conn_pool_map_impl_test.cc index 5b8cd99f2b78b..8ee79ece9599c 100644 --- a/test/common/upstream/conn_pool_map_impl_test.cc +++ b/test/common/upstream/conn_pool_map_impl_test.cc @@ -160,7 +160,7 @@ TEST_F(ConnPoolMapImplTest, CallbacksPassedToPools) { ReadyWatcher watcher; test_map->addIdleCallback([&watcher]() { watcher.ready(); }); - test_map->startDrain(); + test_map->drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainAndDelete); EXPECT_CALL(watcher, ready()).Times(2); cb1(); @@ -173,7 +173,7 @@ TEST_F(ConnPoolMapImplTest, CallbacksCachedAndPassedOnCreation) { ReadyWatcher watcher; test_map->addIdleCallback([&watcher]() { watcher.ready(); }); - test_map->startDrain(); + test_map->drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainAndDelete); Http::ConnectionPool::Instance::IdleCb cb1; test_map->getPool(1, getFactoryExpectIdleCb(&cb1)); @@ -189,7 +189,7 @@ TEST_F(ConnPoolMapImplTest, CallbacksCachedAndPassedOnCreation) { // Tests that if we drain connections on an empty map, nothing happens. TEST_F(ConnPoolMapImplTest, EmptyMapDrainConnectionsNop) { TestMapPtr test_map = makeTestMap(); - test_map->drainConnections(); + test_map->drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections); } // Tests that we forward drainConnections to the pools. @@ -198,10 +198,12 @@ TEST_F(ConnPoolMapImplTest, DrainConnectionsForwarded) { test_map->getPool(1, getBasicFactory()); test_map->getPool(2, getBasicFactory()); - EXPECT_CALL(*mock_pools_[0], drainConnections()); - EXPECT_CALL(*mock_pools_[1], drainConnections()); + EXPECT_CALL(*mock_pools_[0], + drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections)); + EXPECT_CALL(*mock_pools_[1], + drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections)); - test_map->drainConnections(); + test_map->drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections); } TEST_F(ConnPoolMapImplTest, ClearDefersDelete) { diff --git a/test/common/upstream/load_balancer_impl_test.cc b/test/common/upstream/load_balancer_impl_test.cc index dd3f8de8d86f5..dea39058ef38c 100644 --- a/test/common/upstream/load_balancer_impl_test.cc +++ b/test/common/upstream/load_balancer_impl_test.cc @@ -14,6 +14,7 @@ #include "test/mocks/common.h" #include "test/mocks/runtime/mocks.h" #include "test/mocks/upstream/cluster_info.h" +#include "test/mocks/upstream/host.h" #include "test/mocks/upstream/host_set.h" #include "test/mocks/upstream/load_balancer_context.h" #include "test/mocks/upstream/priority_set.h" @@ -33,6 +34,10 @@ namespace Envoy { namespace Upstream { namespace { +static constexpr uint32_t UnhealthyStatus = 1u << static_cast(Host::Health::Unhealthy); +static constexpr uint32_t DegradedStatus = 1u << static_cast(Host::Health::Degraded); +static constexpr uint32_t HealthyStatus = 1u << static_cast(Host::Health::Healthy); + class LoadBalancerTestBase : public Event::TestUsingSimulatedTime, public testing::TestWithParam { protected: @@ -70,9 +75,8 @@ class TestLb : public LoadBalancerBase { using LoadBalancerBase::percentageDegradedLoad; using LoadBalancerBase::percentageLoad; - HostConstSharedPtr chooseHostOnce(LoadBalancerContext*) override { - NOT_IMPLEMENTED_GCOVR_EXCL_LINE; - } + HostConstSharedPtr chooseHost(LoadBalancerContext*) override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; } + HostConstSharedPtr peekAnotherHost(LoadBalancerContext*) override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; } @@ -540,6 +544,100 @@ TEST_P(LoadBalancerBaseTest, BoundaryConditions) { } } +class TestZoneAwareLb : public ZoneAwareLoadBalancerBase { +public: + TestZoneAwareLb(const PrioritySet& priority_set, ClusterStats& stats, Runtime::Loader& runtime, + Random::RandomGenerator& random, + const envoy::config::cluster::v3::Cluster::CommonLbConfig& common_config) + : ZoneAwareLoadBalancerBase(priority_set, nullptr, stats, runtime, random, common_config) {} + + // ZoneAwareLoadBalancerBase will keep a copy of cross priority host map shared pointer and update + // it when the membership is updated. + const HostMapConstSharedPtr& crossPriorityHostMapForTest() { return cross_priority_host_map_; } + + HostConstSharedPtr chooseHostOnce(LoadBalancerContext*) override { + return choose_host_once_host_; + } + HostConstSharedPtr peekAnotherHost(LoadBalancerContext*) override { + NOT_IMPLEMENTED_GCOVR_EXCL_LINE; + } + + HostConstSharedPtr choose_host_once_host_{std::make_shared>()}; +}; + +// Used to test common functions of ZoneAwareLoadBalancerBase. +class ZoneAwareLoadBalancerBaseTest : public LoadBalancerTestBase { +public: + envoy::config::cluster::v3::Cluster::CommonLbConfig common_config_; + TestZoneAwareLb lb_{priority_set_, stats_, runtime_, random_, common_config_}; +}; + +TEST_F(ZoneAwareLoadBalancerBaseTest, CrossPriorityHostMapUpdate) { + // Fake cross priority host map. + auto host_map = std::make_shared(); + priority_set_.cross_priority_host_map_ = host_map; + + // Mock membership update and priority update callbacks will be executed. + host_set_.runCallbacks({}, {}); + + // Host map in the lb is updated. + EXPECT_EQ(host_map.get(), lb_.crossPriorityHostMapForTest().get()); +} + +TEST_F(ZoneAwareLoadBalancerBaseTest, SelectOverrideHostTestInLb) { + NiceMock context; + + { + LoadBalancerContext::OverrideHost override_host{"1.2.3.4", HealthyStatus | DegradedStatus}; + EXPECT_CALL(context, overrideHostToSelect()) + .WillOnce(Return(absl::make_optional(override_host))); + + // Mock membership update and update host map shared pointer in the lb. + auto host_map = std::make_shared(); + priority_set_.cross_priority_host_map_ = host_map; + host_set_.runCallbacks({}, {}); + + // The expected host does not exist in the host map, therefore `chooseHostOnce` will be called. + EXPECT_EQ(lb_.choose_host_once_host_, lb_.chooseHost(&context)); + } + + { + auto mock_host = std::make_shared>(); + EXPECT_CALL(*mock_host, health()).WillOnce(Return(Host::Health::Unhealthy)); + + LoadBalancerContext::OverrideHost override_host{"1.2.3.4", HealthyStatus | DegradedStatus}; + EXPECT_CALL(context, overrideHostToSelect()) + .WillOnce(Return(absl::make_optional(override_host))); + + // Mock membership update and update host map shared pointer in the lb. + auto host_map = std::make_shared(); + host_map->insert({"1.2.3.4", mock_host}); + priority_set_.cross_priority_host_map_ = host_map; + host_set_.runCallbacks({}, {}); + + // Host status does not match the expected host status, therefore `chooseHostOnce` will be + // called. + EXPECT_EQ(lb_.choose_host_once_host_, lb_.chooseHost(&context)); + } + + { + auto mock_host = std::make_shared>(); + EXPECT_CALL(*mock_host, health()).WillOnce(Return(Host::Health::Degraded)); + + LoadBalancerContext::OverrideHost override_host{"1.2.3.4", HealthyStatus | DegradedStatus}; + EXPECT_CALL(context, overrideHostToSelect()) + .WillOnce(Return(absl::make_optional(override_host))); + + // Mock membership update and update host map shared pointer in the lb. + auto host_map = std::make_shared(); + host_map->insert({"1.2.3.4", mock_host}); + priority_set_.cross_priority_host_map_ = host_map; + host_set_.runCallbacks({}, {}); + + EXPECT_EQ(mock_host, lb_.chooseHost(&context)); + } +} + class RoundRobinLoadBalancerTest : public LoadBalancerTestBase { public: void init(bool need_local_cluster) { @@ -1904,6 +2002,92 @@ TEST(LoadBalancerSubsetInfoImplTest, KeysSubsetEqualKeysInvalid) { "fallback_keys_subset cannot be equal to keys"); } +TEST(LoadBalancerContextBaseTest, LoadBalancerContextBaseTest) { + { + LoadBalancerContextBase context; + MockPrioritySet mock_priority_set; + HealthyAndDegradedLoad priority_load{Upstream::HealthyLoad({100, 0, 0}), + Upstream::DegradedLoad({0, 0, 0})}; + RetryPriority::PriorityMappingFunc empty_func = + [](const Upstream::HostDescription&) -> absl::optional { return absl::nullopt; }; + MockHost mock_host; + + EXPECT_EQ(absl::nullopt, context.computeHashKey()); + EXPECT_EQ(nullptr, context.downstreamConnection()); + EXPECT_EQ(nullptr, context.metadataMatchCriteria()); + EXPECT_EQ(nullptr, context.downstreamHeaders()); + + EXPECT_EQ(&priority_load, + &(context.determinePriorityLoad(mock_priority_set, priority_load, empty_func))); + EXPECT_EQ(false, context.shouldSelectAnotherHost(mock_host)); + EXPECT_EQ(1, context.hostSelectionRetryCount()); + EXPECT_EQ(nullptr, context.upstreamSocketOptions()); + EXPECT_EQ(nullptr, context.upstreamTransportSocketOptions()); + EXPECT_EQ(absl::nullopt, context.overrideHostToSelect()); + } + + EXPECT_TRUE(LoadBalancerContextBase::validateOverrideHostStatus(Host::Health::Unhealthy, + UnhealthyStatus)); + EXPECT_TRUE( + LoadBalancerContextBase::validateOverrideHostStatus(Host::Health::Healthy, HealthyStatus)); + EXPECT_FALSE( + LoadBalancerContextBase::validateOverrideHostStatus(Host::Health::Healthy, UnhealthyStatus)); +} + +TEST(LoadBalancerContextBaseTest, selectOverrideHostTest) { + NiceMock context; + + { + // No valid host map. + EXPECT_EQ(nullptr, LoadBalancerContextBase::selectOverrideHost(nullptr, &context)); + } + { + // No valid load balancer context. + auto host_map = std::make_shared(); + EXPECT_EQ(nullptr, LoadBalancerContextBase::selectOverrideHost(host_map.get(), nullptr)); + } + { + // No valid expected host. + EXPECT_CALL(context, overrideHostToSelect()).WillOnce(Return(absl::nullopt)); + auto host_map = std::make_shared(); + EXPECT_EQ(nullptr, LoadBalancerContextBase::selectOverrideHost(host_map.get(), &context)); + } + { + // The host map does not contain the expected host. + LoadBalancerContext::OverrideHost override_host{"1.2.3.4", HealthyStatus}; + EXPECT_CALL(context, overrideHostToSelect()) + .WillOnce(Return(absl::make_optional(override_host))); + auto host_map = std::make_shared(); + EXPECT_EQ(nullptr, LoadBalancerContextBase::selectOverrideHost(host_map.get(), &context)); + } + { + // The status of host is not as expected. + auto mock_host = std::make_shared>(); + EXPECT_CALL(*mock_host, health()).WillOnce(Return(Host::Health::Unhealthy)); + + LoadBalancerContext::OverrideHost override_host{"1.2.3.4", HealthyStatus}; + EXPECT_CALL(context, overrideHostToSelect()) + .WillOnce(Return(absl::make_optional(override_host))); + + auto host_map = std::make_shared(); + host_map->insert({"1.2.3.4", mock_host}); + EXPECT_EQ(nullptr, LoadBalancerContextBase::selectOverrideHost(host_map.get(), &context)); + } + { + // Get expected host. + auto mock_host = std::make_shared>(); + EXPECT_CALL(*mock_host, health()).WillOnce(Return(Host::Health::Degraded)); + + LoadBalancerContext::OverrideHost override_host{"1.2.3.4", HealthyStatus | DegradedStatus}; + EXPECT_CALL(context, overrideHostToSelect()) + .WillOnce(Return(absl::make_optional(override_host))); + + auto host_map = std::make_shared(); + host_map->insert({"1.2.3.4", mock_host}); + EXPECT_EQ(mock_host, LoadBalancerContextBase::selectOverrideHost(host_map.get(), &context)); + } +} + } // namespace } // namespace Upstream } // namespace Envoy diff --git a/test/common/upstream/maglev_lb_test.cc b/test/common/upstream/maglev_lb_test.cc index 46315da016dbf..562d4f0e90fa6 100644 --- a/test/common/upstream/maglev_lb_test.cc +++ b/test/common/upstream/maglev_lb_test.cc @@ -7,7 +7,9 @@ #include "test/common/upstream/utility.h" #include "test/mocks/common.h" #include "test/mocks/upstream/cluster_info.h" +#include "test/mocks/upstream/host.h" #include "test/mocks/upstream/host_set.h" +#include "test/mocks/upstream/load_balancer_context.h" #include "test/mocks/upstream/priority_set.h" #include "test/test_common/simulated_time_system.h" @@ -78,6 +80,29 @@ TEST_F(MaglevLoadBalancerTest, NoHost) { EXPECT_EQ(nullptr, lb_->factory()->create()->chooseHost(nullptr)); }; +TEST_F(MaglevLoadBalancerTest, SelectOverrideHost) { + init(7); + + NiceMock context; + + auto mock_host = std::make_shared>(); + EXPECT_CALL(*mock_host, health()).WillOnce(testing::Return(Host::Health::Degraded)); + + LoadBalancerContext::OverrideHost expected_host{ + "1.2.3.4", 1u << static_cast(Host::Health::Healthy) | + 1u << static_cast(Host::Health::Degraded)}; + EXPECT_CALL(context, overrideHostToSelect()) + .WillOnce(testing::Return(absl::make_optional(expected_host))); + + // Mock membership update and update host map shared pointer in the lb. + auto host_map = std::make_shared(); + host_map->insert({"1.2.3.4", mock_host}); + priority_set_.cross_priority_host_map_ = host_map; + host_set_.runCallbacks({}, {}); + + EXPECT_EQ(mock_host, lb_->factory()->create()->chooseHost(&context)); +} + // Throws an exception if table size is not a prime number. TEST_F(MaglevLoadBalancerTest, NoPrimeNumber) { EXPECT_THROW_WITH_MESSAGE(init(8), EnvoyException, diff --git a/test/common/upstream/priority_conn_pool_map_impl_test.cc b/test/common/upstream/priority_conn_pool_map_impl_test.cc index a7ade68348547..db48e6afc799d 100644 --- a/test/common/upstream/priority_conn_pool_map_impl_test.cc +++ b/test/common/upstream/priority_conn_pool_map_impl_test.cc @@ -156,10 +156,12 @@ TEST_F(PriorityConnPoolMapImplTest, TestDrainConnectionsProxiedThrough) { test_map->getPool(ResourcePriority::High, 0, getBasicFactory()); test_map->getPool(ResourcePriority::Default, 0, getBasicFactory()); - EXPECT_CALL(*mock_pools_[0], drainConnections()); - EXPECT_CALL(*mock_pools_[1], drainConnections()); + EXPECT_CALL(*mock_pools_[0], + drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections)); + EXPECT_CALL(*mock_pools_[1], + drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections)); - test_map->drainConnections(); + test_map->drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections); } } // namespace diff --git a/test/common/upstream/ring_hash_lb_test.cc b/test/common/upstream/ring_hash_lb_test.cc index e9071eb1f5a25..c259f610c54af 100644 --- a/test/common/upstream/ring_hash_lb_test.cc +++ b/test/common/upstream/ring_hash_lb_test.cc @@ -14,7 +14,9 @@ #include "test/mocks/common.h" #include "test/mocks/runtime/mocks.h" #include "test/mocks/upstream/cluster_info.h" +#include "test/mocks/upstream/host.h" #include "test/mocks/upstream/host_set.h" +#include "test/mocks/upstream/load_balancer_context.h" #include "test/mocks/upstream/priority_set.h" #include "test/test_common/simulated_time_system.h" @@ -96,6 +98,28 @@ TEST_P(RingHashLoadBalancerTest, NoHost) { EXPECT_EQ(nullptr, lb_->factory()->create()->chooseHost(nullptr)); }; +TEST_P(RingHashLoadBalancerTest, SelectOverrideHost) { + init(); + + NiceMock context; + + auto mock_host = std::make_shared>(); + EXPECT_CALL(*mock_host, health()).WillOnce(Return(Host::Health::Degraded)); + + LoadBalancerContext::OverrideHost expected_host{ + "1.2.3.4", 1u << static_cast(Host::Health::Healthy) | + 1u << static_cast(Host::Health::Degraded)}; + EXPECT_CALL(context, overrideHostToSelect()).WillOnce(Return(absl::make_optional(expected_host))); + + // Mock membership update and update host map shared pointer in the lb. + auto host_map = std::make_shared(); + host_map->insert({"1.2.3.4", mock_host}); + priority_set_.cross_priority_host_map_ = host_map; + host_set_.runCallbacks({}, {}); + + EXPECT_EQ(mock_host, lb_->factory()->create()->chooseHost(&context)); +} + // Given minimum_ring_size > maximum_ring_size, expect an exception. TEST_P(RingHashLoadBalancerTest, BadRingSizeBounds) { config_ = envoy::config::cluster::v3::Cluster::RingHashLbConfig(); diff --git a/test/common/upstream/subset_lb_test.cc b/test/common/upstream/subset_lb_test.cc index 8d5204287ea5d..169202a448032 100644 --- a/test/common/upstream/subset_lb_test.cc +++ b/test/common/upstream/subset_lb_test.cc @@ -22,6 +22,7 @@ #include "test/mocks/upstream/host.h" #include "test/mocks/upstream/host_set.h" #include "test/mocks/upstream/load_balancer.h" +#include "test/mocks/upstream/load_balancer_context.h" #include "test/mocks/upstream/priority_set.h" #include "test/test_common/simulated_time_system.h" @@ -155,6 +156,8 @@ class SubsetLoadBalancerTest : public Event::TestUsingSimulatedTime, host_set.hosts_per_locality_ = makeHostsPerLocality({hosts}); host_set.healthy_hosts_ = host_set.hosts_; host_set.healthy_hosts_per_locality_ = host_set.hosts_per_locality_; + + host_set.runCallbacks({}, {}); } void configureWeightedHostSet(const HostURLMetadataMap& first_locality_host_metadata, @@ -496,6 +499,28 @@ TEST_F(SubsetLoadBalancerTest, NoFallback) { EXPECT_EQ(0U, stats_.lb_subsets_selected_.value()); } +TEST_F(SubsetLoadBalancerTest, SelectOverrideHost) { + init(); + + NiceMock context; + + auto mock_host = std::make_shared>(); + EXPECT_CALL(*mock_host, health()).WillOnce(Return(Host::Health::Degraded)); + + LoadBalancerContext::OverrideHost expected_host{ + "1.2.3.4", 1u << static_cast(Host::Health::Healthy) | + 1u << static_cast(Host::Health::Degraded)}; + EXPECT_CALL(context, overrideHostToSelect()).WillOnce(Return(absl::make_optional(expected_host))); + + // Mock membership update and update host map shared pointer in the lb. + auto host_map = std::make_shared(); + host_map->insert({"1.2.3.4", mock_host}); + priority_set_.cross_priority_host_map_ = host_map; + configureHostSet({{"tcp://127.0.0.1:80", {{"version", "1.0"}}}}, host_set_); + + EXPECT_EQ(mock_host, lb_->chooseHost(&context)); +} + // Validate that SubsetLoadBalancer unregisters its priority set member update // callback. Regression for heap-use-after-free. TEST_F(SubsetLoadBalancerTest, DeregisterCallbacks) { diff --git a/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc b/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc index eae609db2a7e7..f84e0342e52c4 100644 --- a/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc +++ b/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc @@ -15,7 +15,13 @@ namespace Envoy { +using envoy::config::route::v3::Route; +using envoy::config::route::v3::VirtualHost; +using envoy::extensions::filters::http::ext_proc::v3alpha::ExtProcPerRoute; using envoy::extensions::filters::http::ext_proc::v3alpha::ProcessingMode; +using envoy::extensions::filters::network::http_connection_manager::v3::HttpConnectionManager; +using Envoy::Protobuf::MapPair; +using Envoy::ProtobufWkt::Any; using envoy::service::ext_proc::v3alpha::BodyResponse; using envoy::service::ext_proc::v3alpha::CommonResponse; using envoy::service::ext_proc::v3alpha::HeadersResponse; @@ -86,21 +92,35 @@ class ExtProcIntegrationTest : public HttpIntegrationTest, setDownstreamProtocol(Http::CodecType::HTTP2); } + void setPerRouteConfig(Route* route, const ExtProcPerRoute& cfg) { + Any cfg_any; + ASSERT_TRUE(cfg_any.PackFrom(cfg)); + route->mutable_typed_per_filter_config()->insert( + MapPair("envoy.filters.http.ext_proc", cfg_any)); + } + + void setPerHostConfig(VirtualHost& vh, const ExtProcPerRoute& cfg) { + Any cfg_any; + ASSERT_TRUE(cfg_any.PackFrom(cfg)); + vh.mutable_typed_per_filter_config()->insert( + MapPair("envoy.filters.http.ext_proc", cfg_any)); + } + IntegrationStreamDecoderPtr sendDownstreamRequest( - absl::optional> modify_headers) { + absl::optional> modify_headers) { auto conn = makeClientConnection(lookupPort("http")); codec_client_ = makeHttpConnection(std::move(conn)); Http::TestRequestHeaderMapImpl headers; + HttpTestUtility::addDefaultHeaders(headers); if (modify_headers) { (*modify_headers)(headers); } - HttpTestUtility::addDefaultHeaders(headers); return codec_client_->makeHeaderOnlyRequest(headers); } IntegrationStreamDecoderPtr sendDownstreamRequestWithBody( absl::string_view body, - absl::optional> modify_headers) { + absl::optional> modify_headers) { auto conn = makeClientConnection(lookupPort("http")); codec_client_ = makeHttpConnection(std::move(conn)); Http::TestRequestHeaderMapImpl headers; @@ -1290,4 +1310,101 @@ TEST_P(ExtProcIntegrationTest, BufferBodyOverridePostWithRequestBody) { verifyDownstreamResponse(*response, 200); } +// Set up per-route configuration that sets a custom processing mode on the +// route, and test that the processing mode takes effect. +TEST_P(ExtProcIntegrationTest, PerRouteProcessingMode) { + initializeConfig(); + config_helper_.addConfigModifier([this](HttpConnectionManager& cm) { + // Set up "/foo" so that it will send a buffered body + auto* vh = cm.mutable_route_config()->mutable_virtual_hosts()->Mutable(0); + auto* route = vh->mutable_routes()->Mutable(0); + route->mutable_match()->set_path("/foo"); + ExtProcPerRoute per_route; + per_route.mutable_overrides()->mutable_processing_mode()->set_response_body_mode( + ProcessingMode::BUFFERED); + setPerRouteConfig(route, per_route); + }); + HttpIntegrationTest::initialize(); + + auto response = + sendDownstreamRequest([](Http::RequestHeaderMap& headers) { headers.setPath("/foo"); }); + processRequestHeadersMessage(true, absl::nullopt); + Buffer::OwnedImpl full_response; + TestUtility::feedBufferWithRandomCharacters(full_response, 100); + handleUpstreamRequestWithResponse(full_response, 100); + processResponseHeadersMessage(false, absl::nullopt); + // Because of the per-route config we should get a buffered response + processResponseBodyMessage(false, [&full_response](const HttpBody& body, BodyResponse&) { + EXPECT_TRUE(body.end_of_stream()); + EXPECT_EQ(body.body(), full_response.toString()); + return true; + }); + verifyDownstreamResponse(*response, 200); +} + +// Set up configuration on the virtual host and on the route and see that +// the two are merged. +TEST_P(ExtProcIntegrationTest, PerRouteAndHostProcessingMode) { + initializeConfig(); + config_helper_.addConfigModifier([this](HttpConnectionManager& cm) { + auto* vh = cm.mutable_route_config()->mutable_virtual_hosts()->Mutable(0); + // Set up a processing mode for the host that should not be honored + ExtProcPerRoute per_host; + per_host.mutable_overrides()->mutable_processing_mode()->set_request_header_mode( + ProcessingMode::SKIP); + per_host.mutable_overrides()->mutable_processing_mode()->set_response_header_mode( + ProcessingMode::SKIP); + setPerHostConfig(*vh, per_host); + + // Set up "/foo" so that it will send a buffered body + auto* route = vh->mutable_routes()->Mutable(0); + route->mutable_match()->set_path("/foo"); + ExtProcPerRoute per_route; + per_route.mutable_overrides()->mutable_processing_mode()->set_response_body_mode( + ProcessingMode::BUFFERED); + setPerRouteConfig(route, per_route); + }); + HttpIntegrationTest::initialize(); + + auto response = + sendDownstreamRequest([](Http::RequestHeaderMap& headers) { headers.setPath("/foo"); }); + processRequestHeadersMessage(true, absl::nullopt); + Buffer::OwnedImpl full_response; + TestUtility::feedBufferWithRandomCharacters(full_response, 100); + handleUpstreamRequestWithResponse(full_response, 100); + processResponseHeadersMessage(false, absl::nullopt); + // Because of the per-route config we should get a buffered response. + // If the config from the host is applied then this won't work. + processResponseBodyMessage(false, [&full_response](const HttpBody& body, BodyResponse&) { + EXPECT_TRUE(body.end_of_stream()); + EXPECT_EQ(body.body(), full_response.toString()); + return true; + }); + verifyDownstreamResponse(*response, 200); +} + +// Set up per-route configuration that disables ext_proc for a route and ensure +// that it is not called. +TEST_P(ExtProcIntegrationTest, PerRouteDisable) { + initializeConfig(); + config_helper_.addConfigModifier([this](HttpConnectionManager& cm) { + // Set up "/foo" so that ext_proc is disabled + auto* vh = cm.mutable_route_config()->mutable_virtual_hosts()->Mutable(0); + auto* route = vh->mutable_routes()->Mutable(0); + route->mutable_match()->set_path("/foo"); + ExtProcPerRoute per_route; + per_route.set_disabled(true); + setPerRouteConfig(route, per_route); + }); + HttpIntegrationTest::initialize(); + + auto response = + sendDownstreamRequest([](Http::RequestHeaderMap& headers) { headers.setPath("/foo"); }); + // There should be no ext_proc processing here + Buffer::OwnedImpl full_response; + TestUtility::feedBufferWithRandomCharacters(full_response, 100); + handleUpstreamRequestWithResponse(full_response, 100); + verifyDownstreamResponse(*response, 200); +} + } // namespace Envoy diff --git a/test/extensions/filters/http/ext_proc/filter_test.cc b/test/extensions/filters/http/ext_proc/filter_test.cc index df872b98a3580..86f1803d088ac 100644 --- a/test/extensions/filters/http/ext_proc/filter_test.cc +++ b/test/extensions/filters/http/ext_proc/filter_test.cc @@ -24,6 +24,7 @@ namespace HttpFilters { namespace ExternalProcessing { namespace { +using envoy::extensions::filters::http::ext_proc::v3alpha::ExtProcPerRoute; using envoy::extensions::filters::http::ext_proc::v3alpha::ProcessingMode; using envoy::service::ext_proc::v3alpha::BodyResponse; using envoy::service::ext_proc::v3alpha::CommonResponse; @@ -55,9 +56,11 @@ class HttpFilterTest : public testing::Test { protected: void initialize(std::string&& yaml) { client_ = std::make_unique(); + route_ = std::make_shared>(); EXPECT_CALL(*client_, start(_)).WillOnce(Invoke(this, &HttpFilterTest::doStart)); EXPECT_CALL(encoder_callbacks_, dispatcher()).WillRepeatedly(ReturnRef(dispatcher_)); EXPECT_CALL(decoder_callbacks_, dispatcher()).WillRepeatedly(ReturnRef(dispatcher_)); + EXPECT_CALL(decoder_callbacks_, route()).WillRepeatedly(Return(route_)); EXPECT_CALL(dispatcher_, createTimer_(_)) .Times(AnyNumber()) .WillRepeatedly(Invoke([this](Unused) { @@ -240,6 +243,7 @@ class HttpFilterTest : public testing::Test { testing::NiceMock dispatcher_; Http::MockStreamDecoderFilterCallbacks decoder_callbacks_; Http::MockStreamEncoderFilterCallbacks encoder_callbacks_; + Router::RouteConstSharedPtr route_; Http::TestRequestHeaderMapImpl request_headers_; Http::TestResponseHeaderMapImpl response_headers_; Http::TestRequestTrailerMapImpl request_trailers_; @@ -2017,6 +2021,70 @@ TEST_F(HttpFilterTest, OutOfOrder) { EXPECT_EQ(1, config_->stats().streams_closed_.value()); } +// When merging two configurations, ensure that the second processing mode +// overrides the first. +TEST(OverrideTest, OverrideProcessingMode) { + ExtProcPerRoute cfg1; + cfg1.mutable_overrides()->mutable_processing_mode()->set_request_header_mode( + ProcessingMode::SKIP); + ExtProcPerRoute cfg2; + cfg2.mutable_overrides()->mutable_processing_mode()->set_request_body_mode( + ProcessingMode::STREAMED); + cfg2.mutable_overrides()->mutable_processing_mode()->set_response_body_mode( + ProcessingMode::BUFFERED); + FilterConfigPerRoute route1(cfg1); + FilterConfigPerRoute route2(cfg2); + route1.merge(route2); + EXPECT_FALSE(route1.disabled()); + EXPECT_EQ(route1.processingMode()->request_header_mode(), ProcessingMode::DEFAULT); + EXPECT_EQ(route1.processingMode()->request_body_mode(), ProcessingMode::STREAMED); + EXPECT_EQ(route1.processingMode()->response_body_mode(), ProcessingMode::BUFFERED); +} + +// When merging two configurations, if the first processing mode is set, and +// the second is disabled, then the filter should be disabled. +TEST(OverrideTest, DisableOverridesFirstMode) { + ExtProcPerRoute cfg1; + cfg1.mutable_overrides()->mutable_processing_mode()->set_request_header_mode( + ProcessingMode::SKIP); + ExtProcPerRoute cfg2; + cfg2.set_disabled(true); + FilterConfigPerRoute route1(cfg1); + FilterConfigPerRoute route2(cfg2); + route1.merge(route2); + EXPECT_TRUE(route1.disabled()); + EXPECT_FALSE(route1.processingMode()); +} + +// When merging two configurations, if the first override is disabled, and +// the second has a new mode, then the filter should use the new mode. +TEST(OverrideTest, ModeOverridesFirstDisable) { + ExtProcPerRoute cfg1; + cfg1.set_disabled(true); + ExtProcPerRoute cfg2; + cfg2.mutable_overrides()->mutable_processing_mode()->set_request_header_mode( + ProcessingMode::SKIP); + FilterConfigPerRoute route1(cfg1); + FilterConfigPerRoute route2(cfg2); + route1.merge(route2); + EXPECT_FALSE(route1.disabled()); + EXPECT_EQ(route1.processingMode()->request_header_mode(), ProcessingMode::SKIP); +} + +// When merging two configurations, if both are disabled, then it's still +// disabled. +TEST(OverrideTest, DisabledThingsAreDisabled) { + ExtProcPerRoute cfg1; + cfg1.set_disabled(true); + ExtProcPerRoute cfg2; + cfg2.set_disabled(true); + FilterConfigPerRoute route1(cfg1); + FilterConfigPerRoute route2(cfg2); + route1.merge(route2); + EXPECT_TRUE(route1.disabled()); + EXPECT_FALSE(route1.processingMode()); +} + } // namespace } // namespace ExternalProcessing } // namespace HttpFilters diff --git a/test/extensions/filters/http/ext_proc/ordering_test.cc b/test/extensions/filters/http/ext_proc/ordering_test.cc index af3cf936c1111..edc477994513e 100644 --- a/test/extensions/filters/http/ext_proc/ordering_test.cc +++ b/test/extensions/filters/http/ext_proc/ordering_test.cc @@ -55,9 +55,11 @@ class OrderingTest : public testing::Test { void initialize(absl::optional> cb) { client_ = std::make_unique(); + route_ = std::make_shared>(); EXPECT_CALL(*client_, start(_)).WillOnce(Invoke(this, &OrderingTest::doStart)); EXPECT_CALL(encoder_callbacks_, dispatcher()).WillRepeatedly(ReturnRef(dispatcher_)); EXPECT_CALL(decoder_callbacks_, dispatcher()).WillRepeatedly(ReturnRef(dispatcher_)); + EXPECT_CALL(decoder_callbacks_, route()).WillRepeatedly(Return(route_)); ExternalProcessor proto_config; proto_config.mutable_grpc_service()->mutable_envoy_grpc()->set_cluster_name("ext_proc_server"); @@ -200,6 +202,7 @@ class OrderingTest : public testing::Test { FilterConfigSharedPtr config_; std::unique_ptr filter_; NiceMock dispatcher_; + Router::RouteConstSharedPtr route_; Http::MockStreamDecoderFilterCallbacks decoder_callbacks_; Http::MockStreamEncoderFilterCallbacks encoder_callbacks_; Http::TestRequestHeaderMapImpl request_headers_; diff --git a/test/extensions/filters/network/dubbo_proxy/dubbo_protocol_impl_test.cc b/test/extensions/filters/network/dubbo_proxy/dubbo_protocol_impl_test.cc index 9f57e392df534..19b49d1dbbb2e 100644 --- a/test/extensions/filters/network/dubbo_proxy/dubbo_protocol_impl_test.cc +++ b/test/extensions/filters/network/dubbo_proxy/dubbo_protocol_impl_test.cc @@ -151,6 +151,21 @@ TEST(DubboProtocolImplTest, encode) { EXPECT_TRUE(dubbo_protocol.decodeData(buffer, context, output_metadata)); } +TEST(DubboProtocolImplTest, HeartBeatResponseTest) { + MessageMetadata metadata; + metadata.setMessageType(MessageType::HeartbeatResponse); + metadata.setResponseStatus(ResponseStatus::Ok); + metadata.setSerializationType(SerializationType::Hessian2); + metadata.setRequestId(100); + + Buffer::OwnedImpl buffer; + DubboProtocolImpl dubbo_protocol; + dubbo_protocol.initSerializer(SerializationType::Hessian2); + EXPECT_TRUE(dubbo_protocol.encode(buffer, metadata, "", RpcResponseType::ResponseWithValue)); + // 16 bytes header and one byte null object body. + EXPECT_EQ(17, buffer.length()); +} + TEST(DubboProtocolImplTest, decode) { Buffer::OwnedImpl buffer; MessageMetadataSharedPtr metadata; diff --git a/test/integration/server.h b/test/integration/server.h index da51efe2eabf7..8ad4c633d6892 100644 --- a/test/integration/server.h +++ b/test/integration/server.h @@ -477,11 +477,6 @@ class IntegrationTestServer : public Logger::Loggable, notifyingStatsAllocator().waitForCounterExists(name); } - // TODO(#17956): Add Gauge type to NotifyingAllocator and adopt it in this method. - void waitForGaugeDestroyed(const std::string& name) override { - ASSERT_TRUE(TestUtility::waitForGaugeDestroyed(statStore(), name, time_system_)); - } - void waitUntilHistogramHasSamples( const std::string& name, std::chrono::milliseconds timeout = std::chrono::milliseconds::zero()) override { diff --git a/test/integration/server_stats.h b/test/integration/server_stats.h index d4520d3456db5..66cb7e07e7d27 100644 --- a/test/integration/server_stats.h +++ b/test/integration/server_stats.h @@ -66,12 +66,6 @@ class IntegrationTestServerStats { waitForGaugeEq(const std::string& name, uint64_t value, std::chrono::milliseconds timeout = std::chrono::milliseconds::zero()) PURE; - /** - * Wait for a gauge to be destroyed. Note that MockStatStore does not destroy stat. - * @param name gauge name. - */ - virtual void waitForGaugeDestroyed(const std::string& name) PURE; - /** * Counter lookup. This is not thread safe, since we don't get a consistent * snapshot, uses counters() instead for this behavior. diff --git a/test/integration/xds_integration_test.cc b/test/integration/xds_integration_test.cc index 8e866132782b7..7a11e6a156470 100644 --- a/test/integration/xds_integration_test.cc +++ b/test/integration/xds_integration_test.cc @@ -301,9 +301,6 @@ class LdsInplaceUpdateHttpIntegrationTest std::string tls_inspector_config = ConfigHelper::tlsInspectorFilter(); config_helper_.addListenerFilter(tls_inspector_config); config_helper_.addSslConfig(); - config_helper_.addConfigModifier( - [&](envoy::extensions::filters::network::http_connection_manager::v3::HttpConnectionManager& - hcm) { hcm.mutable_stat_prefix()->assign("hcm0"); }); config_helper_.addConfigModifier([this, add_default_filter_chain]( envoy::config::bootstrap::v3::Bootstrap& bootstrap) { if (!use_default_balancer_) { @@ -338,7 +335,6 @@ class LdsInplaceUpdateHttpIntegrationTest ->mutable_routes(0) ->mutable_route() ->set_cluster("cluster_1"); - hcm_config.mutable_stat_prefix()->assign("hcm1"); config_blob->PackFrom(hcm_config); bootstrap.mutable_static_resources()->mutable_clusters()->Add()->MergeFrom( *bootstrap.mutable_static_resources()->mutable_clusters(0)); @@ -385,7 +381,7 @@ class LdsInplaceUpdateHttpIntegrationTest } } - void expectConnectionServed(std::string alpn = "alpn0") { + void expectConnenctionServed(std::string alpn = "alpn0") { auto codec_client_after_config_update = createHttpCodec(alpn); expectResponseHeaderConnectionClose(*codec_client_after_config_update, false); codec_client_after_config_update->close(); @@ -399,7 +395,7 @@ class LdsInplaceUpdateHttpIntegrationTest }; // Verify that http response on filter chain 1 and default filter chain have "Connection: close" -// header when these 2 filter chains are deleted during the listener update. +// header when these 2 filter chains are deleted during the listener update. TEST_P(LdsInplaceUpdateHttpIntegrationTest, ReloadConfigDeletingFilterChain) { inplaceInitialize(/*add_default_filter_chain=*/true); @@ -407,6 +403,12 @@ TEST_P(LdsInplaceUpdateHttpIntegrationTest, ReloadConfigDeletingFilterChain) { auto codec_client_0 = createHttpCodec("alpn0"); auto codec_client_default = createHttpCodec("alpndefault"); + Cleanup cleanup([c1 = codec_client_1.get(), c0 = codec_client_0.get(), + c_default = codec_client_default.get()]() { + c1->close(); + c0->close(); + c_default->close(); + }); ConfigHelper new_config_helper( version_, *api_, MessageUtil::getJsonStringFromMessageOrDie(config_helper_.bootstrap())); new_config_helper.addConfigModifier( @@ -420,20 +422,12 @@ TEST_P(LdsInplaceUpdateHttpIntegrationTest, ReloadConfigDeletingFilterChain) { test_server_->waitForCounterGe("listener_manager.listener_in_place_updated", 1); test_server_->waitForGaugeGe("listener_manager.total_filter_chains_draining", 1); - test_server_->waitForGaugeGe("http.hcm0.downstream_cx_active", 1); - test_server_->waitForGaugeGe("http.hcm1.downstream_cx_active", 1); - expectResponseHeaderConnectionClose(*codec_client_1, true); expectResponseHeaderConnectionClose(*codec_client_default, true); test_server_->waitForGaugeGe("listener_manager.total_filter_chains_draining", 0); expectResponseHeaderConnectionClose(*codec_client_0, false); - expectConnectionServed(); - - codec_client_1->close(); - test_server_->waitForGaugeDestroyed("http.hcm1.downstream_cx_active"); - codec_client_0->close(); - codec_client_default->close(); + expectConnenctionServed(); } // Verify that http clients of filter chain 0 survives if new listener config adds new filter @@ -444,19 +438,15 @@ TEST_P(LdsInplaceUpdateHttpIntegrationTest, ReloadConfigAddingFilterChain) { auto codec_client_0 = createHttpCodec("alpn0"); Cleanup cleanup0([c0 = codec_client_0.get()]() { c0->close(); }); - test_server_->waitForGaugeGe("http.hcm0.downstream_cx_active", 1); - ConfigHelper new_config_helper( version_, *api_, MessageUtil::getJsonStringFromMessageOrDie(config_helper_.bootstrap())); new_config_helper.addConfigModifier([&](envoy::config::bootstrap::v3::Bootstrap& bootstrap) -> void { auto* listener = bootstrap.mutable_static_resources()->mutable_listeners(0); - // Note that HCM2 copies the stats prefix from HCM0 - listener->mutable_filter_chains()->Add()->MergeFrom(*listener->mutable_filter_chains(0)); + listener->mutable_filter_chains()->Add()->MergeFrom(*listener->mutable_filter_chains(1)); *listener->mutable_filter_chains(2) ->mutable_filter_chain_match() ->mutable_application_protocols(0) = "alpn2"; - auto default_filter_chain = bootstrap.mutable_static_resources()->mutable_listeners(0)->mutable_default_filter_chain(); default_filter_chain->MergeFrom(*listener->mutable_filter_chains(1)); @@ -468,9 +458,6 @@ TEST_P(LdsInplaceUpdateHttpIntegrationTest, ReloadConfigAddingFilterChain) { auto codec_client_2 = createHttpCodec("alpn2"); auto codec_client_default = createHttpCodec("alpndefault"); - // 1 connection from filter chain 0 and 1 connection from filter chain 2. - test_server_->waitForGaugeGe("http.hcm0.downstream_cx_active", 2); - Cleanup cleanup2([c2 = codec_client_2.get(), c_default = codec_client_default.get()]() { c2->close(); c_default->close(); @@ -478,7 +465,7 @@ TEST_P(LdsInplaceUpdateHttpIntegrationTest, ReloadConfigAddingFilterChain) { expectResponseHeaderConnectionClose(*codec_client_2, false); expectResponseHeaderConnectionClose(*codec_client_default, false); expectResponseHeaderConnectionClose(*codec_client_0, false); - expectConnectionServed(); + expectConnenctionServed(); } // Verify that http clients of default filter chain is drained and recreated if the default filter @@ -506,7 +493,7 @@ TEST_P(LdsInplaceUpdateHttpIntegrationTest, ReloadConfigUpdatingDefaultFilterCha Cleanup cleanup2([c_default_v3 = codec_client_default_v3.get()]() { c_default_v3->close(); }); expectResponseHeaderConnectionClose(*codec_client_default, true); expectResponseHeaderConnectionClose(*codec_client_default_v3, false); - expectConnectionServed(); + expectConnenctionServed(); } // Verify that balancer is inherited. Test only default balancer because ExactConnectionBalancer @@ -528,7 +515,7 @@ TEST_P(LdsInplaceUpdateHttpIntegrationTest, OverlappingFilterChainServesNewConne new_config_helper.setLds("1"); test_server_->waitForCounterGe("listener_manager.listener_in_place_updated", 1); expectResponseHeaderConnectionClose(*codec_client_0, false); - expectConnectionServed(); + expectConnenctionServed(); } // Verify default filter chain update is filter chain only update. diff --git a/test/mocks/http/conn_pool.h b/test/mocks/http/conn_pool.h index c1f55c2f92ef8..323eb7a087ac6 100644 --- a/test/mocks/http/conn_pool.h +++ b/test/mocks/http/conn_pool.h @@ -31,8 +31,7 @@ class MockInstance : public Instance { MOCK_METHOD(Http::Protocol, protocol, (), (const)); MOCK_METHOD(void, addIdleCallback, (IdleCb cb)); MOCK_METHOD(bool, isIdle, (), (const)); - MOCK_METHOD(void, startDrain, ()); - MOCK_METHOD(void, drainConnections, ()); + MOCK_METHOD(void, drainConnections, (Envoy::ConnectionPool::DrainBehavior drain_behavior)); MOCK_METHOD(bool, hasActiveConnections, (), (const)); MOCK_METHOD(Cancellable*, newStream, (ResponseDecoder & response_decoder, Callbacks& callbacks)); MOCK_METHOD(bool, maybePreconnect, (float)); diff --git a/test/mocks/tcp/mocks.h b/test/mocks/tcp/mocks.h index 75e79e7aea932..ff3e985e7bd6f 100644 --- a/test/mocks/tcp/mocks.h +++ b/test/mocks/tcp/mocks.h @@ -61,8 +61,7 @@ class MockInstance : public Instance { // Tcp::ConnectionPool::Instance MOCK_METHOD(void, addIdleCallback, (IdleCb cb)); MOCK_METHOD(bool, isIdle, (), (const)); - MOCK_METHOD(void, startDrain, ()); - MOCK_METHOD(void, drainConnections, ()); + MOCK_METHOD(void, drainConnections, (Envoy::ConnectionPool::DrainBehavior drain_behavior)); MOCK_METHOD(void, closeConnections, ()); MOCK_METHOD(Cancellable*, newConnection, (Tcp::ConnectionPool::Callbacks & callbacks)); MOCK_METHOD(bool, maybePreconnect, (float), ()); diff --git a/test/mocks/upstream/load_balancer_context.cc b/test/mocks/upstream/load_balancer_context.cc index 21e16847e8c37..737529ff4dbd5 100644 --- a/test/mocks/upstream/load_balancer_context.cc +++ b/test/mocks/upstream/load_balancer_context.cc @@ -1,6 +1,7 @@ #include "test/mocks/upstream/load_balancer_context.h" using testing::_; +using testing::Return; using testing::ReturnRef; namespace Envoy { @@ -11,6 +12,7 @@ MockLoadBalancerContext::MockLoadBalancerContext() { priority_load_.healthy_priority_load_ = HealthyLoad({100}); priority_load_.degraded_priority_load_ = DegradedLoad({0}); ON_CALL(*this, determinePriorityLoad(_, _, _)).WillByDefault(ReturnRef(priority_load_)); + ON_CALL(*this, overrideHostToSelect()).WillByDefault(Return(absl::nullopt)); } MockLoadBalancerContext::~MockLoadBalancerContext() = default; diff --git a/test/mocks/upstream/load_balancer_context.h b/test/mocks/upstream/load_balancer_context.h index 06959126953f5..8d0bc05a857a7 100644 --- a/test/mocks/upstream/load_balancer_context.h +++ b/test/mocks/upstream/load_balancer_context.h @@ -23,6 +23,7 @@ class MockLoadBalancerContext : public LoadBalancerContext { MOCK_METHOD(Network::Socket::OptionsSharedPtr, upstreamSocketOptions, (), (const)); MOCK_METHOD(Network::TransportSocketOptionsConstSharedPtr, upstreamTransportSocketOptions, (), (const)); + MOCK_METHOD(absl::optional, overrideHostToSelect, (), (const)); private: HealthyAndDegradedLoad priority_load_; diff --git a/test/mocks/upstream/priority_set.cc b/test/mocks/upstream/priority_set.cc index de2289ede221d..86423ec9ba552 100644 --- a/test/mocks/upstream/priority_set.cc +++ b/test/mocks/upstream/priority_set.cc @@ -11,7 +11,7 @@ namespace Upstream { using ::testing::_; using ::testing::Invoke; -using ::testing::Return; +using ::testing::ReturnPointee; using ::testing::ReturnRef; MockPrioritySet::MockPrioritySet() { @@ -26,7 +26,7 @@ MockPrioritySet::MockPrioritySet() { .WillByDefault(Invoke([this](PrioritySet::PriorityUpdateCb cb) -> Common::CallbackHandlePtr { return priority_update_cb_helper_.add(cb); })); - ON_CALL(*this, crossPriorityHostMap()).WillByDefault(Return(cross_priority_host_map_)); + ON_CALL(*this, crossPriorityHostMap()).WillByDefault(ReturnPointee(&cross_priority_host_map_)); } MockPrioritySet::~MockPrioritySet() = default; diff --git a/test/test_common/utility.cc b/test/test_common/utility.cc index c2f68cc000c6d..fe0db0bcf7ee8 100644 --- a/test/test_common/utility.cc +++ b/test/test_common/utility.cc @@ -238,14 +238,6 @@ AssertionResult TestUtility::waitForGaugeEq(Stats::Store& store, const std::stri return AssertionSuccess(); } -AssertionResult TestUtility::waitForGaugeDestroyed(Stats::Store& store, const std::string& name, - Event::TestTimeSystem& time_system) { - while (findGauge(store, name) == nullptr) { - time_system.advanceTimeWait(std::chrono::milliseconds(10)); - } - return AssertionSuccess(); -} - AssertionResult TestUtility::waitUntilHistogramHasSamples(Stats::Store& store, const std::string& name, Event::TestTimeSystem& time_system, diff --git a/test/test_common/utility.h b/test/test_common/utility.h index e7fa952614c92..c355b50643394 100644 --- a/test/test_common/utility.h +++ b/test/test_common/utility.h @@ -277,17 +277,6 @@ class TestUtility { Event::TestTimeSystem& time_system, std::chrono::milliseconds timeout = std::chrono::milliseconds::zero()); - /** - * Wait for a gauge to be destroyed. - * @param store supplies the stats store. - * @param name gauge name. - * @param time_system the time system to use for waiting. - * @return AssertionSuccess() if the gauge was == to the value within the timeout, else - * AssertionFailure(). - */ - static AssertionResult waitForGaugeDestroyed(Stats::Store& store, const std::string& name, - Event::TestTimeSystem& time_system); - /** * Wait for a histogram to have samples. * @param store supplies the stats store. diff --git a/tools/dependency/cve_scan.py b/tools/dependency/cve_scan.py index ddb4663cb10ef..ca2d2a144253c 100755 --- a/tools/dependency/cve_scan.py +++ b/tools/dependency/cve_scan.py @@ -119,6 +119,11 @@ def gather_cpes(nodes, cpe_set): gather_cpes(cve['configurations']['nodes'], cpe_set) if len(cpe_set) == 0: continue + + if not "baseMetricV3" in cve['impact']: + print(f"WARNING: ignoring v2 metric for {cve['cve']['CVE_data_meta']['ID']}") + continue + cvss_v3_score = cve['impact']['baseMetricV3']['cvssV3']['baseScore'] cvss_v3_severity = cve['impact']['baseMetricV3']['cvssV3']['baseSeverity']