diff --git a/docs/root/configuration/operations/overload_manager/overload_manager.rst b/docs/root/configuration/operations/overload_manager/overload_manager.rst index 53fab324e46c6..9835d18d45ca4 100644 --- a/docs/root/configuration/operations/overload_manager/overload_manager.rst +++ b/docs/root/configuration/operations/overload_manager/overload_manager.rst @@ -96,6 +96,10 @@ The following overload actions are supported: - Envoy will reduce the waiting period for a configured set of timeouts. See :ref:`below ` for details on configuration. + * - envoy.overload_actions.reset_streams + - Envoy will reset expensive streams to terminate them. See + :ref:`below ` for details on configuration. + .. _config_overload_manager_reducing_timeouts: Reducing timeouts @@ -163,6 +167,47 @@ all listeners. An example configuration can be found in the :ref:`edge best practices document `. +.. _config_overload_manager_reset_streams: + +Reset Streams +^^^^^^^^^^^^^^^^^ + +The ``envoy.overload_actions.reset_streams`` overload action will reset +expensive streams. This works in conjuction with the +``envoy.reloadable_features.per_stream_buffer_accounting`` flag which enables +per stream buffer accounting. + +As an example, here is a single overload action entry that enables reset streams: + +.. code-block:: yaml + + name: "envoy.overload_actions.reset_streams" + triggers: + - name: "envoy.resource_monitors.fixed_heap" + scaled: + scaling_threshold: 0.85 + saturation_threshold: 0.95 + +It configures the overload manager to reset certain streams depending on the +heap size. When the heap usage is less than 85%, no streams will be reset. +When heap usage is at or above 85%, we start to reset certain memory classes +(e.g. streams using memory within a power of two range). There are 8 buckets, +with the last bucket capturing all of the streams using :math:`>= 128 * +minimum_threshold_for_tracking`. The `minimum_threshold_for_tracking` can be +configured via :ref:`buffer_factory_config +`. + +Given that there are only 8 buckets, we partition the space with a gradation of +:math:`gradation = (saturation_threshold - scaling_threshold)/8`. Hence at 85% +we reset streams in the last bucket. At :math:`85% + 1 * gradation` we reset +streams in the second to last, and last bucket. And so forth as memory pressure +is higher. + +It's expected that the first few gradations shouldn't trigger anything, unless +there's something seriously wrong e.g. the existence of streams using :math:`>= +128 * minimum_threshold_for_tracking`. + + Statistics ---------- diff --git a/docs/root/version_history/current.rst b/docs/root/version_history/current.rst index fe14c7af5c5e5..c61571261681d 100644 --- a/docs/root/version_history/current.rst +++ b/docs/root/version_history/current.rst @@ -62,6 +62,55 @@ New Features * listener: new listener metric `downstream_cx_transport_socket_connect_timeout` to track transport socket timeouts. * rbac: added :ref:`destination_port_range ` for matching range of destination ports. +* access_log: added the new response flag for :ref:`overload manager termination `. The response flag will be set when the http stream is terminated by overload manager. +* admission control: added :ref:`admission control ` option that when average RPS of the sampling window is below this threshold, the filter will not throttle requests. Added :ref:`admission control ` option to set an upper limit on the probability of rejection. +* bandwidth_limit: added new :ref:`HTTP bandwidth limit filter `. +* bootstrap: added :ref:`dns_resolution_config ` to aggregate all of the DNS resolver configuration in a single message. By setting one such configuration option ``no_default_search_domain`` as true the DNS resolver will not use the default search domains. And by setting the configuration ``resolvers`` we can specify the external DNS servers to be used for external DNS query. +* cluster: added :ref:`dns_resolution_config ` to aggregate all of the DNS resolver configuration in a single message. By setting one such configuration option ``no_default_search_domain`` as true the DNS resolver will not use the default search domains. +* cluster: added :ref:`host_rewrite_literal ` to WeightedCluster. +* cluster: added :ref:`wait_for_warm_on_init `, which allows cluster readiness to not block on cluster warm-up. It is true by default, which preserves existing behavior. Currently, only applicable for DNS-based clusters. +* composite filter: can now be used with filters that also add an access logger, such as the WASM filter. +* config: added stat :ref:`config_reload_time_ms `. +* connection_limit: added new :ref:`Network connection limit filter `. +* crash support: restore crash context when continuing to processing requests or responses as a result of an asynchronous callback that invokes a filter directly. This is unlike the call stacks that go through the various network layers, to eventually reach the filter. For a concrete example see: ``Envoy::Extensions::HttpFilters::Cache::CacheFilter::getHeaders`` which posts a callback on the dispatcher that will invoke the filter directly. +* dns cache: added :ref:`preresolve_hostnames ` option to the DNS cache config. This option allows hostnames to be preresolved into the cache upon cache creation. This might provide performance improvement, in the form of cache hits, for hostnames that are going to be resolved during steady state and are known at config load time. +* dns cache: added :ref:`dns_query_timeout ` option to the DNS cache config. This option allows explicitly controlling the timeout of underlying queries independently of the underlying DNS platform implementation. Coupled with success and failure retry policies the use of this timeout will lead to more deterministic DNS resolution times. +* dns resolver: added ``DnsResolverOptions`` protobuf message to reconcile all of the DNS lookup option flags. By setting the configuration option :ref:`use_tcp_for_dns_lookups ` as true we can make the underlying dns resolver library to make only TCP queries to the DNS servers and by setting the configuration option :ref:`no_default_search_domain ` as true the DNS resolver library will not use the default search domains. +* dns resolver: added ``DnsResolutionConfig`` to combine :ref:`dns_resolver_options ` and :ref:`resolvers ` in a single protobuf message. The field ``resolvers`` can be specified with a list of DNS resolver addresses. If specified, DNS client library will perform resolution via the underlying DNS resolvers. Otherwise, the default system resolvers (e.g., /etc/resolv.conf) will be used. +* dns_filter: added :ref:`dns_resolution_config ` to aggregate all of the DNS resolver configuration in a single message. By setting the configuration option ``use_tcp_for_dns_lookups`` to true we can make dns filter's external resolvers to answer queries using TCP only, by setting the configuration option ``no_default_search_domain`` as true the DNS resolver will not use the default search domains. And by setting the configuration ``resolvers`` we can specify the external DNS servers to be used for external DNS query which replaces the pre-existing alpha api field ``upstream_resolvers``. +* dynamic_forward_proxy: added :ref:`dns_resolution_config ` option to the DNS cache config in order to aggregate all of the DNS resolver configuration in a single message. By setting one such configuration option ``no_default_search_domain`` as true the DNS resolver will not use the default search domains. And by setting the configuration ``resolvers`` we can specify the external DNS servers to be used for external DNS query instead of the system default resolvers. +* http: a new field ``is_optional`` is added to ``extensions.filters.network.http_connection_manager.v3.HttpFilter``. When + value is ``true``, the unsupported http filter will be ignored by envoy. This is also same with unsupported http filter + in the typed per filter config. For more information, please reference + :ref:`HttpFilter `. +* http: added :ref``scheme options ` for adding or overwriting scheme. +* http: added :ref:`stripping trailing host dot from host header ` support. +* http: added support for :ref:`original IP detection extensions `. + Two initial extensions were added, the :ref:`custom header ` extension and the + :ref:`xff ` extension. +* http: added a new option to upstream HTTP/2 :ref:`keepalive ` to send a PING ahead of a new stream if the connection has been idle for a sufficient duration. +* http: added the ability to :ref:`unescape slash sequences ` in the path. Requests with unescaped slashes can be proxied, rejected or redirected to the new unescaped path. By default this feature is disabled. The default behavior can be overridden through :ref:`http_connection_manager.path_with_escaped_slashes_action` runtime variable. This action can be selectively enabled for a portion of requests by setting the :ref:`http_connection_manager.path_with_escaped_slashes_action_sampling` runtime variable. +* http: added upstream and downstream alpha HTTP/3 support! See :ref:`quic_options ` for downstream and the new http3_protocol_options in :ref:`http_protocol_options ` for upstream HTTP/3. +* input matcher: a new input matcher that :ref:`matches an IP address against a list of CIDR ranges `. +* jwt_authn: added support to fetch remote jwks asynchronously specified by :ref:`async_fetch `. +* jwt_authn: added support to add padding in the forwarded JWT payload specified by :ref:`pad_forward_payload_header `. +* listener: added ability to change an existing listener's address. +* listener: added filter chain match support for :ref:`direct source address `. +* local_rate_limit_filter: added suppoort for locally rate limiting http requests on a per connection basis. This can be enabled by setting the :ref:`local_rate_limit_per_downstream_connection ` field to true. +* metric service: added support for sending metric tags as labels. This can be enabled by setting the :ref:`emit_tags_as_labels ` field to true. +* overload: add a new overload action that resets streams using a lot of memory. To enable the tracking of allocated bytes in buffers that a stream is using turn on ``envoy.reloadable_features.per_stream_buffer_accounting``. Only streams using above a minimum threshold of allocated bytes are tracked in various power of two sized buckets. The minimum threshold for tracking is tunable via :ref:`buffer_factory_config `. If the overload action is triggered, we reset the most expensive stream first. +* proxy protocol: added support for generating the header while using the :ref:`HTTP connection manager `. This is done using the using the :ref:`Proxy Protocol Transport Socket ` on upstream clusters. + This feature is currently affected by a memory leak `issue `_. +* req_without_query: added access log formatter extension implementing command operator :ref:`REQ_WITHOUT_QUERY ` to log the request path, while excluding the query string. +* router: added flag ``suppress_grpc_request_failure_code_stats`` to :ref:`key ` to allow users to exclude incrementing HTTP status code stats on gRPC requests. +* stats: added native :ref:`Graphite-formatted tag ` support. +* tcp: added support for :ref:`preconnecting `. Preconnecting is off by default, but recommended for clusters serving latency-sensitive traffic. +* thrift_proxy: added per upstream metrics within the :ref:`thrift router ` for request and response size histograms. +* thrift_proxy: added support for :ref:`outlier detection `. +* tls: allow dual ECDSA/RSA certs via SDS. Previously, SDS only supported a single certificate per context, and dual cert was only supported via non-SDS. +* udp_proxy: added :ref:`key ` as another hash policy to support hash based routing on any given key. +* windows container image: added user, EnvoyUser which is part of the Network Configuration Operators group to the container image. + Deprecated ---------- * cluster: :ref:`max_requests_per_connection ` is deprecated in favor of :ref:`max_requests_per_connection `. diff --git a/envoy/buffer/buffer.h b/envoy/buffer/buffer.h index 3c22e7c78a3e2..85e762a2c1a2a 100644 --- a/envoy/buffer/buffer.h +++ b/envoy/buffer/buffer.h @@ -521,6 +521,15 @@ class WatermarkFactory { * @return a BufferMemoryAccountSharedPtr of the newly created account. */ virtual BufferMemoryAccountSharedPtr createAccount(Http::StreamResetHandler& reset_handler) PURE; + + /** + * Goes through the tracked accounts, resetting the accounts and their + * corresponding stream depending on the pressure. + * + * @param pressure scaled threshold pressure used to compute the buckets to + * reset internally. + */ + virtual void resetAccountsGivenPressure(float pressure) PURE; }; using WatermarkFactoryPtr = std::unique_ptr; diff --git a/envoy/server/overload/overload_manager.h b/envoy/server/overload/overload_manager.h index 7aa694b34f489..03f7dc32c0956 100644 --- a/envoy/server/overload/overload_manager.h +++ b/envoy/server/overload/overload_manager.h @@ -34,6 +34,9 @@ class OverloadActionNameValues { // Overload action to reduce some subset of configured timeouts. const std::string ReduceTimeouts = "envoy.overload_actions.reduce_timeouts"; + + // Overload action to reset streams using excessive memory. + const std::string ResetStreams = "envoy.overload_actions.reset_streams"; }; using OverloadActionNames = ConstSingleton; diff --git a/source/common/buffer/watermark_buffer.cc b/source/common/buffer/watermark_buffer.cc index 65786db227ffe..c7bd3ebec8871 100644 --- a/source/common/buffer/watermark_buffer.cc +++ b/source/common/buffer/watermark_buffer.cc @@ -1,10 +1,13 @@ #include "source/common/buffer/watermark_buffer.h" +#include "watermark_buffer.h" +#include #include #include "envoy/buffer/buffer.h" #include "source/common/common/assert.h" +#include "source/common/common/logger.h" #include "source/common/runtime/runtime_features.h" namespace Envoy { @@ -155,31 +158,60 @@ WatermarkBufferFactory::createAccount(Http::StreamResetHandler& reset_handler) { } void WatermarkBufferFactory::updateAccountClass(const BufferMemoryAccountSharedPtr& account, - int current_class, int new_class) { + absl::optional current_class, + absl::optional new_class) { ASSERT(current_class != new_class, "Expected the current_class and new_class to be different"); - if (current_class == -1 && new_class >= 0) { + if (!current_class.has_value() && new_class >= 0u) { // Start tracking - ASSERT(!size_class_account_sets_[new_class].contains(account)); - size_class_account_sets_[new_class].insert(account); - } else if (current_class >= 0 && new_class == -1) { + ASSERT(!size_class_account_sets_[new_class.value()].contains(account)); + size_class_account_sets_[new_class.value()].insert(account); + } else if (current_class >= 0u && !new_class.has_value()) { // No longer track - ASSERT(size_class_account_sets_[current_class].contains(account)); - size_class_account_sets_[current_class].erase(account); + ASSERT(size_class_account_sets_[current_class.value()].contains(account)); + size_class_account_sets_[current_class.value()].erase(account); } else { // Moving between buckets - ASSERT(size_class_account_sets_[current_class].contains(account)); - ASSERT(!size_class_account_sets_[new_class].contains(account)); - size_class_account_sets_[new_class].insert( - std::move(size_class_account_sets_[current_class].extract(account).value())); + ASSERT(size_class_account_sets_[current_class.value()].contains(account)); + ASSERT(!size_class_account_sets_[new_class.value()].contains(account)); + size_class_account_sets_[new_class.value()].insert( + std::move(size_class_account_sets_[current_class.value()].extract(account).value())); } } void WatermarkBufferFactory::unregisterAccount(const BufferMemoryAccountSharedPtr& account, - int current_class) { - if (current_class >= 0) { - ASSERT(size_class_account_sets_[current_class].contains(account)); - size_class_account_sets_[current_class].erase(account); + absl::optional current_class) { + if (current_class.has_value()) { + ASSERT(size_class_account_sets_[current_class.value()].contains(account)); + size_class_account_sets_[current_class.value()].erase(account); + } +} + +void WatermarkBufferFactory::resetAccountsGivenPressure(float pressure) { + ASSERT(pressure >= 0.0 && pressure <= 1.0, "Provided pressure is out of range [0, 1]."); + + // Compute buckets to clear + const uint32_t buckets_to_clear = std::min( + std::floor(pressure * BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_) + 1, 8); + uint32_t bucket_idx = BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_ - buckets_to_clear; + + ENVOY_LOG_MISC(warn, "resetting streams in buckets >= {}", bucket_idx); + + // Clear buckets + while (bucket_idx < BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_) { + ENVOY_LOG_MISC(warn, "resetting {} streams in bucket {}.", + size_class_account_sets_[bucket_idx].size(), bucket_idx); + + auto it = size_class_account_sets_[bucket_idx].begin(); + while (it != size_class_account_sets_[bucket_idx].end()) { + auto next = std::next(it); + // This will trigger an erase, which avoids rehashing and invalidates the + // iterator *it*. *next* is still valid. + (*it)->resetDownstream(); + it = next; + } + + ++bucket_idx; } } @@ -214,20 +246,20 @@ BufferMemoryAccountImpl::createAccount(WatermarkBufferFactory* factory, return account; } -int BufferMemoryAccountImpl::balanceToClassIndex() { +absl::optional BufferMemoryAccountImpl::balanceToClassIndex() { static uint32_t bitshift = factory_->bitshift(); uint64_t shifted_balance = buffer_memory_allocated_ >> bitshift; if (shifted_balance == 0) { - return -1; // Not worth tracking anything < configured minimum threshold + return {}; // Not worth tracking anything < configured minimum threshold } const int class_idx = absl::bit_width(shifted_balance) - 1; - return std::min(class_idx, NUM_MEMORY_CLASSES_ - 1); + return std::min(class_idx, NUM_MEMORY_CLASSES_ - 1); } void BufferMemoryAccountImpl::updateAccountClass() { - const int new_class = balanceToClassIndex(); + auto new_class = balanceToClassIndex(); if (shared_this_ && new_class != current_bucket_idx_) { factory_->updateAccountClass(shared_this_, current_bucket_idx_, new_class); current_bucket_idx_ = new_class; @@ -251,7 +283,7 @@ void BufferMemoryAccountImpl::clearDownstream() { if (reset_handler_.has_value()) { reset_handler_.reset(); factory_->unregisterAccount(shared_this_, current_bucket_idx_); - current_bucket_idx_ = -1; + current_bucket_idx_.reset(); shared_this_ = nullptr; } } diff --git a/source/common/buffer/watermark_buffer.h b/source/common/buffer/watermark_buffer.h index d3cbb1f20f3c3..980ed3c92a689 100644 --- a/source/common/buffer/watermark_buffer.h +++ b/source/common/buffer/watermark_buffer.h @@ -92,6 +92,11 @@ class BufferMemoryAccountImpl : public BufferMemoryAccount { static BufferMemoryAccountSharedPtr createAccount(WatermarkBufferFactory* factory, Http::StreamResetHandler& reset_handler); ~BufferMemoryAccountImpl() override { + // The buffer_memory_allocated_ should always be zero on destruction, even if we + // triggered a reset of the downstream. This is because the dtor only will + // trigger when no entities have a pointer to the account, meaning any slices + // which charge and credit the account should have credited the account when + // they were deleted, maintaining this invariant. ASSERT(buffer_memory_allocated_ == 0); ASSERT(!reset_handler_.has_value()); } @@ -129,15 +134,13 @@ class BufferMemoryAccountImpl : public BufferMemoryAccount { // Returns the class index based off of the buffer_memory_allocated_ // This can differ with current_bucket_idx_ if buffer_memory_allocated_ was // just modified. - // The class indexes returned are based on buckets of powers of two, if the - // account is above a minimum threshold. Returned class index range is [-1, - // NUM_MEMORY_CLASSES_). - int balanceToClassIndex(); + // Returned class index, if present, is in the range [0, NUM_MEMORY_CLASSES_). + absl::optional balanceToClassIndex(); void updateAccountClass(); uint64_t buffer_memory_allocated_ = 0; // Current bucket index where the account is being tracked in. - int current_bucket_idx_ = -1; + absl::optional current_bucket_idx_{}; WatermarkBufferFactory* factory_ = nullptr; @@ -195,16 +198,19 @@ class WatermarkBufferFactory : public WatermarkFactory { } BufferMemoryAccountSharedPtr createAccount(Http::StreamResetHandler& reset_handler) override; + void resetAccountsGivenPressure(float pressure) override; // Called by BufferMemoryAccountImpls created by the factory on account class // updated. - void updateAccountClass(const BufferMemoryAccountSharedPtr& account, int current_class, - int new_class); + void updateAccountClass(const BufferMemoryAccountSharedPtr& account, + absl::optional current_class, + absl::optional new_class); uint32_t bitshift() const { return bitshift_; } // Unregister a buffer memory account. - virtual void unregisterAccount(const BufferMemoryAccountSharedPtr& account, int current_class); + virtual void unregisterAccount(const BufferMemoryAccountSharedPtr& account, + absl::optional current_class); protected: // Enable subclasses to inspect the mapping. diff --git a/source/common/http/conn_manager_impl.cc b/source/common/http/conn_manager_impl.cc index 3321e1126726b..349a5bbeafb2d 100644 --- a/source/common/http/conn_manager_impl.cc +++ b/source/common/http/conn_manager_impl.cc @@ -277,7 +277,7 @@ RequestDecoder& ConnectionManagerImpl::newStream(ResponseEncoder& response_encod // work-in-progress, and will be removed when other features using the // accounting are implemented. Buffer::BufferMemoryAccountSharedPtr downstream_stream_account; - if (Runtime::runtimeFeatureEnabled("envoy.test_only.per_stream_buffer_accounting")) { + if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.per_stream_buffer_accounting")) { // Create account, wiring the stream to use it. auto& buffer_factory = read_callbacks_->connection().dispatcher().getWatermarkFactory(); downstream_stream_account = buffer_factory.createAccount(response_encoder.getStream()); diff --git a/source/common/runtime/runtime_features.cc b/source/common/runtime/runtime_features.cc index b828b76476f19..900e740fa2de8 100644 --- a/source/common/runtime/runtime_features.cc +++ b/source/common/runtime/runtime_features.cc @@ -117,8 +117,8 @@ constexpr const char* disabled_runtime_features[] = { "envoy.reloadable_features.remove_legacy_json", // Sentinel and test flag. "envoy.reloadable_features.test_feature_false", - // TODO(kbaichoo): Remove when this is no longer test only. - "envoy.test_only.per_stream_buffer_accounting", + // TODO(kbaichoo): flip to true in a separate PR if we want to do accounting by default. + "envoy.reloadable_features.per_stream_buffer_accounting", // Allows the use of ExtensionWithMatcher to wrap a HTTP filter with a match tree. "envoy.reloadable_features.experimental_matching_api", // When the runtime is flipped to true, use shared cache in getOrCreateRawAsyncClient method if diff --git a/source/server/worker_impl.cc b/source/server/worker_impl.cc index 55fc17a281490..290e2842a07aa 100644 --- a/source/server/worker_impl.cc +++ b/source/server/worker_impl.cc @@ -35,6 +35,9 @@ WorkerImpl::WorkerImpl(ThreadLocal::Instance& tls, ListenerHooks& hooks, overload_manager.registerForAction( OverloadActionNames::get().RejectIncomingConnections, *dispatcher_, [this](OverloadActionState state) { rejectIncomingConnectionsCb(state); }); + overload_manager.registerForAction( + OverloadActionNames::get().ResetStreams, *dispatcher_, + [this](OverloadActionState state) { resetStreamsUsingExcessiveMemory(state); }); } void WorkerImpl::addListener(absl::optional overridden_listener, @@ -148,5 +151,9 @@ void WorkerImpl::rejectIncomingConnectionsCb(OverloadActionState state) { handler_->setListenerRejectFraction(state.value()); } +void WorkerImpl::resetStreamsUsingExcessiveMemory(OverloadActionState state) { + dispatcher_->getWatermarkFactory().resetAccountsGivenPressure(state.value().value()); +} + } // namespace Server } // namespace Envoy diff --git a/source/server/worker_impl.h b/source/server/worker_impl.h index c5187c6716de3..9ea358e123398 100644 --- a/source/server/worker_impl.h +++ b/source/server/worker_impl.h @@ -58,6 +58,7 @@ class WorkerImpl : public Worker, Logger::Loggable { void threadRoutine(GuardDog& guard_dog, const Event::PostCb& cb); void stopAcceptingConnectionsCb(OverloadActionState state); void rejectIncomingConnectionsCb(OverloadActionState state); + void resetStreamsUsingExcessiveMemory(OverloadActionState state); ThreadLocal::Instance& tls_; ListenerHooks& hooks_; diff --git a/test/common/buffer/buffer_memory_account_test.cc b/test/common/buffer/buffer_memory_account_test.cc index a1c1cce2b3080..f869e8cbaa71c 100644 --- a/test/common/buffer/buffer_memory_account_test.cc +++ b/test/common/buffer/buffer_memory_account_test.cc @@ -1,3 +1,5 @@ +#include + #include "envoy/config/bootstrap/v3/bootstrap.pb.h" #include "envoy/http/codec.h" @@ -489,6 +491,87 @@ TEST(WatermarkBufferFactoryTest, ReleaseAssertIfAccountTrackingThresholdBytesIsN "Expected account_tracking_threshold_bytes to be a power of two."); } +TEST(WatermarkBufferFactoryTest, ShouldOnlyResetAllStreamsGreatThanOrEqualToProvidedIndex) { + TrackedWatermarkBufferFactory factory(kMinimumBalanceToTrack); + Http::MockStreamResetHandler largest_stream_to_reset; + Http::MockStreamResetHandler stream_to_reset; + Http::MockStreamResetHandler stream_that_should_not_be_reset; + + auto largest_account_to_reset = factory.createAccount(&largest_stream_to_reset); + auto account_to_reset = factory.createAccount(&stream_to_reset); + auto account_to_not_reset = factory.createAccount(&stream_that_should_not_be_reset); + + largest_account_to_reset->charge(kThresholdForFinalBucket); + account_to_reset->charge(2 * kMinimumBalanceToTrack); + account_to_not_reset->charge(kMinimumBalanceToTrack); + + // Check that all of the accounts are tracked + factory.inspectMemoryClasses([](MemoryClassesToAccountsSet& memory_classes_to_account) { + EXPECT_EQ(memory_classes_to_account[0].size(), 1); + EXPECT_EQ(memory_classes_to_account[1].size(), 1); + EXPECT_EQ(memory_classes_to_account[7].size(), 1); + }); + + EXPECT_CALL(largest_stream_to_reset, resetStream(_)).WillOnce(Invoke([&]() { + largest_account_to_reset->credit(getBalance(largest_account_to_reset)); + largest_account_to_reset->clearDownstream(); + })); + + EXPECT_CALL(stream_to_reset, resetStream(_)).WillOnce(Invoke([&]() { + account_to_reset->credit(getBalance(account_to_reset)); + account_to_reset->clearDownstream(); + })); + + EXPECT_CALL(stream_that_should_not_be_reset, resetStream(_)).Times(0); + // Should call resetStream on all streams in bucket >= 1. + factory.resetAccountsGivenPressure(0.85); + + account_to_not_reset->credit(kMinimumBalanceToTrack); + account_to_not_reset->clearDownstream(); +} + +TEST(WatermarkBufferFactoryTest, ComputesBucketToResetCorrectly) { + TrackedWatermarkBufferFactory factory(kMinimumBalanceToTrack); + + // Create vector of accounts and handlers + std::vector> reset_handlers; + std::vector accounts; + uint32_t seed_account_balance = kMinimumBalanceToTrack; + + for (uint32_t i = 0; i < BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_; ++i) { + reset_handlers.emplace_back(std::make_unique()); + accounts.emplace_back(factory.createAccount(reset_handlers.back().get())); + accounts.back()->charge(seed_account_balance); + seed_account_balance *= 2; + } + + // Check that all memory classes have a corresponding account + factory.inspectMemoryClasses([](MemoryClassesToAccountsSet& memory_classes_to_account) { + for (auto& account_set : memory_classes_to_account) { + EXPECT_EQ(account_set.size(), 1); + } + }); + + // Reset accounts checking correct threshold + float pressure = 0.0; + const float pressure_gradation = 1.0 / BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_; + for (uint32_t i = 0; i < BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_; ++i) { + EXPECT_CALL(*reset_handlers.back(), resetStream(_)).WillOnce(Invoke([&]() { + auto current_account = accounts.back(); + current_account->credit(getBalance(current_account)); + current_account->clearDownstream(); + })); + + factory.resetAccountsGivenPressure(pressure); + + // Move onto next reset handler and account + accounts.pop_back(); + reset_handlers.pop_back(); + + pressure += pressure_gradation; + } +} + } // namespace } // namespace Buffer } // namespace Envoy diff --git a/test/integration/BUILD b/test/integration/BUILD index 08c29b1cee5d6..780d77323caa5 100644 --- a/test/integration/BUILD +++ b/test/integration/BUILD @@ -387,6 +387,7 @@ envoy_cc_test( "buffer_accounting_integration_test.cc", ], deps = [ + ":base_overload_integration_test_lib", ":http_integration_lib", ":http_protocol_integration_lib", ":socket_interface_swap_lib", diff --git a/test/integration/buffer_accounting_integration_test.cc b/test/integration/buffer_accounting_integration_test.cc index 7eb6ec7ab6249..4cb9ec01aed9d 100644 --- a/test/integration/buffer_accounting_integration_test.cc +++ b/test/integration/buffer_accounting_integration_test.cc @@ -8,6 +8,7 @@ #include "source/common/buffer/buffer_impl.h" #include "test/integration/autonomous_upstream.h" +#include "test/integration/base_overload_integration_test.h" #include "test/integration/http_protocol_integration.h" #include "test/integration/tracked_watermark_buffer.h" #include "test/integration/utility.h" @@ -21,6 +22,9 @@ namespace Envoy { namespace { + +using testing::HasSubstr; + std::string protocolTestParamsAndBoolToString( const ::testing::TestParamInfo>& params) { return fmt::format("{}_{}", @@ -96,8 +100,9 @@ class Http2BufferWatermarksTest } protected: + // For testing purposes, track >= 4096B accounts. std::shared_ptr buffer_factory_ = - std::make_shared(1024 * 1024); // Track >= 1MB + std::make_shared(4096); bool streamBufferAccounting() { return std::get<1>(GetParam()); } @@ -391,4 +396,103 @@ TEST_P(ProtocolsBufferWatermarksTest, ResettingStreamUnregistersAccount) { } } +class Http2OverloadManagerIntegrationTest : public Http2BufferWatermarksTest, + public Envoy::BaseOverloadIntegrationTest { +protected: + void initializeOverloadManagerInBootstrap( + const envoy::config::overload::v3::OverloadAction& overload_action) { + setupOverloadManagerConfig(overload_action); + config_helper_.addConfigModifier([this](envoy::config::bootstrap::v3::Bootstrap& bootstrap) { + *bootstrap.mutable_overload_manager() = this->overload_manager_config_; + }); + } +}; + +// Run the tests using HTTP2 only since its the only protocol that's fully +// supported. +// TODO(kbaichoo): Instantiate with H3 and H1 as well when their buffers are +// bounded to accounts. +INSTANTIATE_TEST_SUITE_P( + IpVersions, Http2OverloadManagerIntegrationTest, + testing::Combine(testing::ValuesIn(HttpProtocolIntegrationTest::getProtocolTestParams( + {Http::CodecType::HTTP2}, {FakeHttpConnection::Type::HTTP2})), + testing::Bool()), + protocolTestParamsAndBoolToString); + +TEST_P(Http2OverloadManagerIntegrationTest, ResetsExpensiveStreamsWhenOverloaded) { + autonomous_upstream_ = true; + autonomous_allow_incomplete_streams_ = true; + initializeOverloadManagerInBootstrap( + TestUtility::parseYaml(R"EOF( + name: "envoy.overload_actions.reset_streams" + triggers: + - name: "envoy.resource_monitors.testonly.fake_resource_monitor" + scaled: + scaling_threshold: 0.90 + saturation_threshold: 0.98 + )EOF")); + initialize(); + + // Makes us have Envoy's writes to upstream return EAGAIN + writev_matcher_->setDestinationPort(fake_upstreams_[0]->localAddress()->ip()->port()); + writev_matcher_->setWritevReturnsEgain(); + + codec_client_ = makeHttpConnection(lookupPort("http")); + auto largest_request_response = std::move(sendRequests(1, 4096 * 4, 4096)[0]); + auto medium_request_response = std::move(sendRequests(1, 4096 * 2, 4096)[0]); + auto smallest_request_response = std::move(sendRequests(1, 4096, 4096)[0]); + + // Wait for requests to come into Envoy. + EXPECT_TRUE(buffer_factory_->waitUntilTotalBufferedExceeds(7 * 4096)); + + // Set the pressure so the overload action kicks in + updateResource(0.95); + test_server_->waitForGaugeEq("overload.envoy.overload_actions.reset_streams.scale_percent", 62); + + // Wait for the proxy to notice and take action for the overload by only + // resetting the largest stream. + if (streamBufferAccounting()) { + test_server_->waitForCounterGe("http.config_test.downstream_rq_rx_reset", 1); + EXPECT_TRUE(largest_request_response->waitForReset()); + EXPECT_TRUE(largest_request_response->reset()); + + ASSERT_FALSE(medium_request_response->complete()); + } + + // Increase resource pressure to reset the medium request + updateResource(0.96); + + // Wait for the proxy to notice and take action for the overload. + if (streamBufferAccounting()) { + test_server_->waitForCounterGe("http.config_test.downstream_rq_rx_reset", 2); + EXPECT_TRUE(medium_request_response->waitForReset()); + EXPECT_TRUE(medium_request_response->reset()); + + ASSERT_FALSE(smallest_request_response->complete()); + } + + // Reduce resource pressure + updateResource(0.80); + test_server_->waitForGaugeEq("overload.envoy.overload_actions.reset_streams.scale_percent", 0); + + // Resume writes to upstream, any request streams that survive can go through. + writev_matcher_->setResumeWrites(); + + if (!streamBufferAccounting()) { + // If we're not doing the accounting, we didn't end up resetting these + // streams. + ASSERT_TRUE(largest_request_response->waitForEndStream()); + ASSERT_TRUE(largest_request_response->complete()); + EXPECT_EQ(largest_request_response->headers().getStatusValue(), "200"); + + ASSERT_TRUE(medium_request_response->waitForEndStream()); + ASSERT_TRUE(medium_request_response->complete()); + EXPECT_EQ(medium_request_response->headers().getStatusValue(), "200"); + } + + ASSERT_TRUE(smallest_request_response->waitForEndStream()); + ASSERT_TRUE(smallest_request_response->complete()); + EXPECT_EQ(smallest_request_response->headers().getStatusValue(), "200"); +} + } // namespace Envoy diff --git a/test/integration/tracked_watermark_buffer.cc b/test/integration/tracked_watermark_buffer.cc index 8d5dabce1ad0c..a0b41f339c8f5 100644 --- a/test/integration/tracked_watermark_buffer.cc +++ b/test/integration/tracked_watermark_buffer.cc @@ -92,7 +92,7 @@ TrackedWatermarkBufferFactory::createAccount(Http::StreamResetHandler& reset_han } void TrackedWatermarkBufferFactory::unregisterAccount(const BufferMemoryAccountSharedPtr& account, - int current_class) { + absl::optional current_class) { WatermarkBufferFactory::unregisterAccount(account, current_class); absl::MutexLock lock(&mutex_); ++total_accounts_unregistered_; diff --git a/test/integration/tracked_watermark_buffer.h b/test/integration/tracked_watermark_buffer.h index d308f8c53f8ad..3e70045dbaa3a 100644 --- a/test/integration/tracked_watermark_buffer.h +++ b/test/integration/tracked_watermark_buffer.h @@ -70,7 +70,8 @@ class TrackedWatermarkBufferFactory : public WatermarkBufferFactory { std::function above_high_watermark, std::function above_overflow_watermark) override; BufferMemoryAccountSharedPtr createAccount(Http::StreamResetHandler& reset_handler) override; - void unregisterAccount(const BufferMemoryAccountSharedPtr& account, int current_class) override; + void unregisterAccount(const BufferMemoryAccountSharedPtr& account, + absl::optional current_class) override; // Number of buffers created. uint64_t numBuffersCreated() const; diff --git a/test/mocks/buffer/mocks.h b/test/mocks/buffer/mocks.h index c3579a3b8d7d4..438ff411a27be 100644 --- a/test/mocks/buffer/mocks.h +++ b/test/mocks/buffer/mocks.h @@ -89,6 +89,7 @@ class MockBufferFactory : public Buffer::WatermarkFactory { std::function above_overflow)); MOCK_METHOD(Buffer::BufferMemoryAccountSharedPtr, createAccount, (Http::StreamResetHandler&)); + MOCK_METHOD(void, resetAccountsGivenPressure, (float)); }; MATCHER_P(BufferEqual, rhs, testing::PrintToString(*rhs)) {