diff --git a/docs/root/configuration/operations/overload_manager/overload_manager.rst b/docs/root/configuration/operations/overload_manager/overload_manager.rst index 7f034f086151a..12ae610921c09 100644 --- a/docs/root/configuration/operations/overload_manager/overload_manager.rst +++ b/docs/root/configuration/operations/overload_manager/overload_manager.rst @@ -234,12 +234,19 @@ streams based on heap usage as a trigger. When the heap usage is less than 85%, no streams will be reset. When heap usage is at or above 85%, we start to reset buckets according to the strategy described below. When the heap usage is at 95% all streams using >= 1MiB memory are eligible for reset. +This overload action will reset up to 50 streams (this is a hardcoded limit) +per worker everytime the action is invoked. This is both to reduce the amount +of streams that end up getting reset and to prevent the worker thread from +locking up and triggering the Watchdog system. Given that there are only 8 buckets, we partition the space with a gradation of :math:`gradation = (saturation_threshold - scaling_threshold)/8`. Hence at 85% heap usage we reset streams in the last bucket e.g. those using `>= 128MiB`. At :math:`85% + 1 * gradation` heap usage we reset streams in the last two buckets -e.g. those using `>= 64MiB`. And so forth as the heap usage is higher. +e.g. those using `>= 64MiB`, prioritizing the streams in the last bucket since +there's a hard limit on the number of streams we can reset per invokation. +At :math:`85% + 2 * gradation` heap usage we reset streams in the last three +buckets e.g. those using `>= 32MiB`. And so forth as the heap usage is higher. It's expected that the first few gradations shouldn't trigger anything, unless there's something seriously wrong e.g. in this example streams using `>= diff --git a/source/common/buffer/watermark_buffer.cc b/source/common/buffer/watermark_buffer.cc index 734fed3fb0443..10ab4500f77bf 100644 --- a/source/common/buffer/watermark_buffer.cc +++ b/source/common/buffer/watermark_buffer.cc @@ -16,6 +16,9 @@ namespace { // Effectively disables tracking as this should zero out all reasonable account // balances when shifted by this amount. constexpr uint32_t kEffectivelyDisableTrackingBitshift = 63; +// 50 is an arbitrary limit, and is meant to both limit the number of streams +// Envoy ends up resetting and avoid triggering the Watchdog system. +constexpr uint32_t kMaxNumberOfStreamsToResetPerInvocation = 50; } // end namespace void WatermarkBuffer::add(const void* data, uint64_t size) { @@ -194,19 +197,23 @@ uint64_t WatermarkBufferFactory::resetAccountsGivenPressure(float pressure) { // Compute buckets to clear const uint32_t buckets_to_clear = std::min( std::floor(pressure * BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_) + 1, 8); - uint32_t bucket_idx = BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_ - buckets_to_clear; - - ENVOY_LOG_MISC(warn, "resetting streams in buckets >= {}", bucket_idx); - uint64_t num_streams_reset = 0; - // TODO(kbaichoo): Add a limit to the number of streams we reset - // per-invocation of this function. - // Clear buckets - while (bucket_idx < BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_) { + + uint32_t last_bucket_to_clear = BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_ - buckets_to_clear; + ENVOY_LOG_MISC(warn, "resetting streams in buckets >= {}", last_bucket_to_clear); + + // Clear buckets, prioritizing the buckets with larger streams. + uint32_t num_streams_reset = 0; + for (uint32_t buckets_cleared = 0; buckets_cleared < buckets_to_clear; ++buckets_cleared) { + const uint32_t bucket_to_clear = + BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_ - buckets_cleared - 1; ENVOY_LOG_MISC(warn, "resetting {} streams in bucket {}.", - size_class_account_sets_[bucket_idx].size(), bucket_idx); + size_class_account_sets_[bucket_to_clear].size(), bucket_to_clear); - auto it = size_class_account_sets_[bucket_idx].begin(); - while (it != size_class_account_sets_[bucket_idx].end()) { + auto it = size_class_account_sets_[bucket_to_clear].begin(); + while (it != size_class_account_sets_[bucket_to_clear].end()) { + if (num_streams_reset >= kMaxNumberOfStreamsToResetPerInvocation) { + return num_streams_reset; + } auto next = std::next(it); // This will trigger an erase, which avoids rehashing and invalidates the // iterator *it*. *next* is still valid. @@ -214,8 +221,6 @@ uint64_t WatermarkBufferFactory::resetAccountsGivenPressure(float pressure) { it = next; ++num_streams_reset; } - - ++bucket_idx; } return num_streams_reset; diff --git a/test/common/buffer/buffer_memory_account_test.cc b/test/common/buffer/buffer_memory_account_test.cc index a651eca14dffa..b6b6a20193717 100644 --- a/test/common/buffer/buffer_memory_account_test.cc +++ b/test/common/buffer/buffer_memory_account_test.cc @@ -22,6 +22,7 @@ using MemoryClassesToAccountsSet = std::array()), + account_(factory.createAccount(*reset_handler_)) {} + + void expectResetStream() { + EXPECT_CALL(*reset_handler_, resetStream(_)).WillOnce([this](Http::StreamResetReason) { + account_->credit(getBalance(account_)); + account_->clearDownstream(); + reset_handler_invoked_ = true; + }); + } + + std::unique_ptr reset_handler_; + bool reset_handler_invoked_{false}; + BufferMemoryAccountSharedPtr account_; +}; + +using AccountWithResetHandlerPtr = std::unique_ptr; + +TEST(WatermarkBufferFactoryTest, + LimitsNumberOfStreamsResetPerInvocationOfResetAccountsGivenPressure) { + TrackedWatermarkBufferFactory factory(absl::bit_width(kMinimumBalanceToTrack)); + + std::vector accounts_to_reset; + for (int i = 0; i < 2 * kMaxStreamsResetPerCall; ++i) { + accounts_to_reset.push_back(std::make_unique(factory)); + accounts_to_reset.back()->account_->charge(kThresholdForFinalBucket); + accounts_to_reset.back()->expectResetStream(); + } + + // Assert accounts tracked. + factory.inspectMemoryClasses([](MemoryClassesToAccountsSet& memory_classes_to_account) { + ASSERT_EQ(memory_classes_to_account[BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_ - 1].size(), + 2 * kMaxStreamsResetPerCall); + }); + + // We should only reset up to the max number of streams that should be reset. + int streams_reset = 0; + EXPECT_EQ(factory.resetAccountsGivenPressure(1.0), kMaxStreamsResetPerCall); + for (const auto& account : accounts_to_reset) { + if (account->reset_handler_invoked_) { + ++streams_reset; + } + } + + EXPECT_EQ(streams_reset, kMaxStreamsResetPerCall); + + // Subsequent call to reset the remaining streams. + EXPECT_EQ(factory.resetAccountsGivenPressure(1.0), kMaxStreamsResetPerCall); + for (const auto& account : accounts_to_reset) { + EXPECT_TRUE(account->reset_handler_invoked_); + } +} + +// Tests that of the eligible streams to reset, we start resetting the largest +// streams. +TEST(WatermarkBufferFactoryTest, + ShouldPrioritizeResettingTheLargestEligibleStreamsPerInvocationOfResetAccountGivenPressure) { + TrackedWatermarkBufferFactory factory(absl::bit_width(kMinimumBalanceToTrack)); + + std::vector accounts_reset_in_first_batch; + for (int i = 0; i < kMaxStreamsResetPerCall; ++i) { + accounts_reset_in_first_batch.push_back(std::make_unique(factory)); + accounts_reset_in_first_batch.back()->account_->charge(kThresholdForFinalBucket); + accounts_reset_in_first_batch.back()->expectResetStream(); + } + + std::vector accounts_reset_in_second_batch; + for (int i = 0; i < kMaxStreamsResetPerCall; ++i) { + accounts_reset_in_second_batch.push_back(std::make_unique(factory)); + accounts_reset_in_second_batch.back()->account_->charge(kMinimumBalanceToTrack); + } + + // Assert accounts tracked. + factory.inspectMemoryClasses([](MemoryClassesToAccountsSet& memory_classes_to_account) { + ASSERT_EQ(memory_classes_to_account[0].size(), kMaxStreamsResetPerCall); + ASSERT_EQ(memory_classes_to_account[BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_ - 1].size(), + kMaxStreamsResetPerCall); + }); + + // All buckets are eligible for having streams reset given the pressure. + // However we will hit the maximum number to reset per call and shouldn't + // have any in the second batch reset. + EXPECT_EQ(factory.resetAccountsGivenPressure(1.0), kMaxStreamsResetPerCall); + for (int i = 0; i < kMaxStreamsResetPerCall; ++i) { + EXPECT_TRUE(accounts_reset_in_first_batch[i]->reset_handler_invoked_); + EXPECT_FALSE(accounts_reset_in_second_batch[i]->reset_handler_invoked_); + } + + // Subsequent call should get those in the second batch. + for (int i = 0; i < kMaxStreamsResetPerCall; ++i) { + accounts_reset_in_second_batch[i]->expectResetStream(); + } + EXPECT_EQ(factory.resetAccountsGivenPressure(1.0), kMaxStreamsResetPerCall); + for (int i = 0; i < kMaxStreamsResetPerCall; ++i) { + EXPECT_TRUE(accounts_reset_in_second_batch[i]->reset_handler_invoked_); + } +} + } // namespace } // namespace Buffer } // namespace Envoy