Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -234,12 +234,19 @@ streams based on heap usage as a trigger. When the heap usage is less than 85%,
no streams will be reset. When heap usage is at or above 85%, we start to
reset buckets according to the strategy described below. When the heap
usage is at 95% all streams using >= 1MiB memory are eligible for reset.
This overload action will reset up to 50 streams (this is a hardcoded limit)
per worker everytime the action is invoked. This is both to reduce the amount
of streams that end up getting reset and to prevent the worker thread from
locking up and triggering the Watchdog system.

Given that there are only 8 buckets, we partition the space with a gradation of
:math:`gradation = (saturation_threshold - scaling_threshold)/8`. Hence at 85%
heap usage we reset streams in the last bucket e.g. those using `>= 128MiB`. At
:math:`85% + 1 * gradation` heap usage we reset streams in the last two buckets
e.g. those using `>= 64MiB`. And so forth as the heap usage is higher.
e.g. those using `>= 64MiB`, prioritizing the streams in the last bucket since
there's a hard limit on the number of streams we can reset per invokation.
At :math:`85% + 2 * gradation` heap usage we reset streams in the last three
buckets e.g. those using `>= 32MiB`. And so forth as the heap usage is higher.

It's expected that the first few gradations shouldn't trigger anything, unless
there's something seriously wrong e.g. in this example streams using `>=
Expand Down
31 changes: 18 additions & 13 deletions source/common/buffer/watermark_buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ namespace {
// Effectively disables tracking as this should zero out all reasonable account
// balances when shifted by this amount.
constexpr uint32_t kEffectivelyDisableTrackingBitshift = 63;
// 50 is an arbitrary limit, and is meant to both limit the number of streams
// Envoy ends up resetting and avoid triggering the Watchdog system.
constexpr uint32_t kMaxNumberOfStreamsToResetPerInvocation = 50;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd comment here as well that 50 is arbitrary, to limit work done (as you doc up above)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

} // end namespace

void WatermarkBuffer::add(const void* data, uint64_t size) {
Expand Down Expand Up @@ -194,28 +197,30 @@ uint64_t WatermarkBufferFactory::resetAccountsGivenPressure(float pressure) {
// Compute buckets to clear
const uint32_t buckets_to_clear = std::min<uint32_t>(
std::floor(pressure * BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_) + 1, 8);
uint32_t bucket_idx = BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_ - buckets_to_clear;

ENVOY_LOG_MISC(warn, "resetting streams in buckets >= {}", bucket_idx);
uint64_t num_streams_reset = 0;
// TODO(kbaichoo): Add a limit to the number of streams we reset
// per-invocation of this function.
// Clear buckets
while (bucket_idx < BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_) {

uint32_t last_bucket_to_clear = BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_ - buckets_to_clear;
ENVOY_LOG_MISC(warn, "resetting streams in buckets >= {}", last_bucket_to_clear);

// Clear buckets, prioritizing the buckets with larger streams.
uint32_t num_streams_reset = 0;
for (uint32_t buckets_cleared = 0; buckets_cleared < buckets_to_clear; ++buckets_cleared) {
const uint32_t bucket_to_clear =
BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_ - buckets_cleared - 1;
ENVOY_LOG_MISC(warn, "resetting {} streams in bucket {}.",
size_class_account_sets_[bucket_idx].size(), bucket_idx);
size_class_account_sets_[bucket_to_clear].size(), bucket_to_clear);

auto it = size_class_account_sets_[bucket_idx].begin();
while (it != size_class_account_sets_[bucket_idx].end()) {
auto it = size_class_account_sets_[bucket_to_clear].begin();
while (it != size_class_account_sets_[bucket_to_clear].end()) {
if (num_streams_reset >= kMaxNumberOfStreamsToResetPerInvocation) {
return num_streams_reset;
}
auto next = std::next(it);
// This will trigger an erase, which avoids rehashing and invalidates the
// iterator *it*. *next* is still valid.
(*it)->resetDownstream();
it = next;
++num_streams_reset;
}

++bucket_idx;
}

return num_streams_reset;
Expand Down
104 changes: 104 additions & 0 deletions test/common/buffer/buffer_memory_account_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ using MemoryClassesToAccountsSet = std::array<absl::flat_hash_set<BufferMemoryAc

constexpr uint64_t kMinimumBalanceToTrack = 1024 * 1024;
constexpr uint64_t kThresholdForFinalBucket = 128 * 1024 * 1024;
constexpr int kMaxStreamsResetPerCall = 50;

// Gets the balance of an account assuming it's a BufferMemoryAccountImpl.
static int getBalance(const BufferMemoryAccountSharedPtr& account) {
Expand Down Expand Up @@ -573,6 +574,109 @@ TEST(WatermarkBufferFactoryTest, ComputesBucketToResetCorrectly) {
}
}

// Encapsulates an Account and its corresponding StreamResetHandler, tracking
// whether the reset handler was invoked.
struct AccountWithResetHandler {

AccountWithResetHandler(WatermarkBufferFactory& factory)
: reset_handler_(std::make_unique<Http::MockStreamResetHandler>()),
account_(factory.createAccount(*reset_handler_)) {}

void expectResetStream() {
EXPECT_CALL(*reset_handler_, resetStream(_)).WillOnce([this](Http::StreamResetReason) {
account_->credit(getBalance(account_));
account_->clearDownstream();
reset_handler_invoked_ = true;
});
}

std::unique_ptr<Http::MockStreamResetHandler> reset_handler_;
bool reset_handler_invoked_{false};
BufferMemoryAccountSharedPtr account_;
};

using AccountWithResetHandlerPtr = std::unique_ptr<AccountWithResetHandler>;

TEST(WatermarkBufferFactoryTest,
LimitsNumberOfStreamsResetPerInvocationOfResetAccountsGivenPressure) {
TrackedWatermarkBufferFactory factory(absl::bit_width(kMinimumBalanceToTrack));

std::vector<AccountWithResetHandlerPtr> accounts_to_reset;
for (int i = 0; i < 2 * kMaxStreamsResetPerCall; ++i) {
accounts_to_reset.push_back(std::make_unique<AccountWithResetHandler>(factory));
accounts_to_reset.back()->account_->charge(kThresholdForFinalBucket);
accounts_to_reset.back()->expectResetStream();
}

// Assert accounts tracked.
factory.inspectMemoryClasses([](MemoryClassesToAccountsSet& memory_classes_to_account) {
ASSERT_EQ(memory_classes_to_account[BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_ - 1].size(),
2 * kMaxStreamsResetPerCall);
});

// We should only reset up to the max number of streams that should be reset.
int streams_reset = 0;
EXPECT_EQ(factory.resetAccountsGivenPressure(1.0), kMaxStreamsResetPerCall);
for (const auto& account : accounts_to_reset) {
if (account->reset_handler_invoked_) {
++streams_reset;
}
}

EXPECT_EQ(streams_reset, kMaxStreamsResetPerCall);

// Subsequent call to reset the remaining streams.
EXPECT_EQ(factory.resetAccountsGivenPressure(1.0), kMaxStreamsResetPerCall);
for (const auto& account : accounts_to_reset) {
EXPECT_TRUE(account->reset_handler_invoked_);
}
}

// Tests that of the eligible streams to reset, we start resetting the largest
// streams.
TEST(WatermarkBufferFactoryTest,
ShouldPrioritizeResettingTheLargestEligibleStreamsPerInvocationOfResetAccountGivenPressure) {
TrackedWatermarkBufferFactory factory(absl::bit_width(kMinimumBalanceToTrack));

std::vector<AccountWithResetHandlerPtr> accounts_reset_in_first_batch;
for (int i = 0; i < kMaxStreamsResetPerCall; ++i) {
accounts_reset_in_first_batch.push_back(std::make_unique<AccountWithResetHandler>(factory));
accounts_reset_in_first_batch.back()->account_->charge(kThresholdForFinalBucket);
accounts_reset_in_first_batch.back()->expectResetStream();
}

std::vector<AccountWithResetHandlerPtr> accounts_reset_in_second_batch;
for (int i = 0; i < kMaxStreamsResetPerCall; ++i) {
accounts_reset_in_second_batch.push_back(std::make_unique<AccountWithResetHandler>(factory));
accounts_reset_in_second_batch.back()->account_->charge(kMinimumBalanceToTrack);
}

// Assert accounts tracked.
factory.inspectMemoryClasses([](MemoryClassesToAccountsSet& memory_classes_to_account) {
ASSERT_EQ(memory_classes_to_account[0].size(), kMaxStreamsResetPerCall);
ASSERT_EQ(memory_classes_to_account[BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_ - 1].size(),
kMaxStreamsResetPerCall);
});

// All buckets are eligible for having streams reset given the pressure.
// However we will hit the maximum number to reset per call and shouldn't
// have any in the second batch reset.
EXPECT_EQ(factory.resetAccountsGivenPressure(1.0), kMaxStreamsResetPerCall);
for (int i = 0; i < kMaxStreamsResetPerCall; ++i) {
EXPECT_TRUE(accounts_reset_in_first_batch[i]->reset_handler_invoked_);
EXPECT_FALSE(accounts_reset_in_second_batch[i]->reset_handler_invoked_);
}

// Subsequent call should get those in the second batch.
for (int i = 0; i < kMaxStreamsResetPerCall; ++i) {
accounts_reset_in_second_batch[i]->expectResetStream();
}
EXPECT_EQ(factory.resetAccountsGivenPressure(1.0), kMaxStreamsResetPerCall);
for (int i = 0; i < kMaxStreamsResetPerCall; ++i) {
EXPECT_TRUE(accounts_reset_in_second_batch[i]->reset_handler_invoked_);
}
}

} // namespace
} // namespace Buffer
} // namespace Envoy