-
Notifications
You must be signed in to change notification settings - Fork 5.5k
LocalRateLimit(HTTP): Add dynamic token bucket support #36623
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 45 commits
6ff6a4b
71ecf21
365e7c4
cde6760
e1dc1ac
f1e4765
15f655e
9d251f8
ed02fa6
d305bd5
53e84bb
bb34179
b8fa581
0966f09
e9690d4
0b6eb05
d0b0433
3463d28
25aa2ab
0a3df60
eb50e85
80a770b
9a2448e
04eadba
210f269
e697169
44eefb0
c3b36d7
97455e8
8c2a558
ff7d24d
78d3be0
039f452
d1d70ca
c7401eb
a439c5d
eaa9c95
db7a666
538d597
e46f681
605ef6e
5527817
2bff228
894a264
45496e7
eaaf2d4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,6 +2,7 @@ | |
|
|
||
| #include <chrono> | ||
| #include <cmath> | ||
| #include <memory> | ||
|
|
||
| #include "envoy/runtime/runtime.h" | ||
|
|
||
|
|
@@ -80,7 +81,8 @@ RateLimitTokenBucket::RateLimitTokenBucket(uint64_t max_tokens, uint64_t tokens_ | |
| TimeSource& time_source) | ||
| : token_bucket_(max_tokens, time_source, | ||
| // Calculate the fill rate in tokens per second. | ||
| tokens_per_fill / std::chrono::duration<double>(fill_interval).count()) {} | ||
| tokens_per_fill / std::chrono::duration<double>(fill_interval).count()), | ||
| fill_interval_(fill_interval) {} | ||
|
|
||
| bool RateLimitTokenBucket::consume(double factor, uint64_t to_consume) { | ||
| ASSERT(!(factor <= 0.0 || factor > 1.0)); | ||
|
|
@@ -93,10 +95,10 @@ LocalRateLimiterImpl::LocalRateLimiterImpl( | |
| const uint64_t tokens_per_fill, Event::Dispatcher& dispatcher, | ||
| const Protobuf::RepeatedPtrField< | ||
| envoy::extensions::common::ratelimit::v3::LocalRateLimitDescriptor>& descriptors, | ||
| bool always_consume_default_token_bucket, ShareProviderSharedPtr shared_provider) | ||
| bool always_consume_default_token_bucket, ShareProviderSharedPtr shared_provider, | ||
| uint32_t lru_size) | ||
| : time_source_(dispatcher.timeSource()), share_provider_(std::move(shared_provider)), | ||
| always_consume_default_token_bucket_(always_consume_default_token_bucket) { | ||
|
|
||
| // Ignore the default token bucket if fill_interval is 0 because 0 fill_interval means nothing | ||
| // and has undefined behavior. | ||
| if (fill_interval.count() > 0) { | ||
|
|
@@ -109,8 +111,12 @@ LocalRateLimiterImpl::LocalRateLimiterImpl( | |
|
|
||
| for (const auto& descriptor : descriptors) { | ||
| RateLimit::LocalDescriptor new_descriptor; | ||
| bool wildcard_found = false; | ||
| new_descriptor.entries_.reserve(descriptor.entries_size()); | ||
| for (const auto& entry : descriptor.entries()) { | ||
| if (entry.value().empty()) { | ||
| wildcard_found = true; | ||
| } | ||
| new_descriptor.entries_.push_back({entry.key(), entry.value()}); | ||
| } | ||
|
|
||
|
|
@@ -126,11 +132,17 @@ LocalRateLimiterImpl::LocalRateLimiterImpl( | |
| throw EnvoyException("local rate limit descriptor token bucket fill timer must be >= 50ms"); | ||
| } | ||
|
|
||
| if (wildcard_found) { | ||
| DynamicDescriptorSharedPtr dynamic_descriptor = std::make_shared<DynamicDescriptor>( | ||
| per_descriptor_max_tokens, per_descriptor_tokens_per_fill, per_descriptor_fill_interval, | ||
| lru_size, dispatcher.timeSource()); | ||
| dynamic_descriptors_.addDescriptor(std::move(new_descriptor), std::move(dynamic_descriptor)); | ||
| continue; | ||
| } | ||
| RateLimitTokenBucketSharedPtr per_descriptor_token_bucket = | ||
| std::make_shared<RateLimitTokenBucket>(per_descriptor_max_tokens, | ||
| per_descriptor_tokens_per_fill, | ||
| per_descriptor_fill_interval, time_source_); | ||
|
|
||
| auto result = | ||
| descriptors_.emplace(std::move(new_descriptor), std::move(per_descriptor_token_bucket)); | ||
| if (!result.second) { | ||
|
|
@@ -143,12 +155,12 @@ LocalRateLimiterImpl::LocalRateLimiterImpl( | |
| LocalRateLimiterImpl::~LocalRateLimiterImpl() = default; | ||
|
|
||
| struct MatchResult { | ||
| std::reference_wrapper<RateLimitTokenBucket> token_bucket; | ||
| RateLimitTokenBucketSharedPtr token_bucket; | ||
| std::reference_wrapper<const RateLimit::Descriptor> request_descriptor; | ||
| }; | ||
|
|
||
| LocalRateLimiterImpl::Result LocalRateLimiterImpl::requestAllowed( | ||
| absl::Span<const RateLimit::Descriptor> request_descriptors) const { | ||
| LocalRateLimiterImpl::Result | ||
| LocalRateLimiterImpl::requestAllowed(absl::Span<const RateLimit::Descriptor> request_descriptors) { | ||
|
|
||
| // In most cases the request descriptors has only few elements. We use a inlined vector to | ||
| // avoid heap allocation. | ||
|
|
@@ -158,55 +170,159 @@ LocalRateLimiterImpl::Result LocalRateLimiterImpl::requestAllowed( | |
| for (const auto& request_descriptor : request_descriptors) { | ||
| auto iter = descriptors_.find(request_descriptor); | ||
| if (iter != descriptors_.end()) { | ||
| matched_results.push_back(MatchResult{*iter->second, request_descriptor}); | ||
| matched_results.push_back(MatchResult{iter->second, request_descriptor}); | ||
| } else { | ||
| auto token_bucket = dynamic_descriptors_.getBucket(request_descriptor); | ||
| if (token_bucket != nullptr) { | ||
| matched_results.push_back(MatchResult{token_bucket, request_descriptor}); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| if (matched_results.size() > 1) { | ||
| // Sort the matched descriptors by token bucket fill rate to ensure the descriptor with the | ||
| // smallest fill rate is consumed first. | ||
| std::sort(matched_results.begin(), matched_results.end(), [](const auto& lhs, const auto& rhs) { | ||
| return lhs.token_bucket.get().fillRate() < rhs.token_bucket.get().fillRate(); | ||
| return lhs.token_bucket->fillRate() < rhs.token_bucket->fillRate(); | ||
| }); | ||
| } | ||
|
|
||
| const double share_factor = | ||
| share_provider_ != nullptr ? share_provider_->getTokensShareFactor() : 1.0; | ||
|
|
||
| // See if the request is forbidden by any of the matched descriptors. | ||
| for (auto match_result : matched_results) { | ||
| if (!match_result.token_bucket.get().consume( | ||
| for (const auto& match_result : matched_results) { | ||
| if (!match_result.token_bucket->consume( | ||
| share_factor, match_result.request_descriptor.get().hits_addend_.value_or(1))) { | ||
| // If the request is forbidden by a descriptor, return the result and the descriptor | ||
| // token bucket. | ||
| return {false, makeOptRef<TokenBucketContext>(match_result.token_bucket.get())}; | ||
| return {false, std::shared_ptr<TokenBucketContext>(match_result.token_bucket)}; | ||
| } | ||
| ENVOY_LOG(trace, | ||
| "request allowed by descriptor with fill rate: {}, maxToken: {}, remainingToken {}", | ||
| match_result.token_bucket->fillRate(), match_result.token_bucket->maxTokens(), | ||
| match_result.token_bucket->remainingTokens()); | ||
| } | ||
|
|
||
| // See if the request is forbidden by the default token bucket. | ||
| if (matched_results.empty() || always_consume_default_token_bucket_) { | ||
| if (default_token_bucket_ == nullptr) { | ||
| return {true, matched_results.empty() | ||
| ? makeOptRefFromPtr<TokenBucketContext>(nullptr) | ||
| : makeOptRef<TokenBucketContext>(matched_results[0].token_bucket.get())}; | ||
| ? std::shared_ptr<TokenBucketContext>(nullptr) | ||
| : std::shared_ptr<TokenBucketContext>(matched_results[0].token_bucket)}; | ||
| } | ||
| ASSERT(default_token_bucket_ != nullptr); | ||
|
|
||
| if (const bool result = default_token_bucket_->consume(share_factor); !result) { | ||
| // If the request is forbidden by the default token bucket, return the result and the | ||
| // default token bucket. | ||
| return {false, makeOptRefFromPtr<TokenBucketContext>(default_token_bucket_.get())}; | ||
| return {false, std::shared_ptr<TokenBucketContext>(default_token_bucket_)}; | ||
| } | ||
|
|
||
| // If the request is allowed then return the result the token bucket. The descriptor | ||
| // token bucket will be selected as priority if it exists. | ||
| return {true, makeOptRef<TokenBucketContext>(matched_results.empty() | ||
| ? *default_token_bucket_ | ||
| : matched_results[0].token_bucket.get())}; | ||
| return {true, | ||
| matched_results.empty() ? default_token_bucket_ : matched_results[0].token_bucket}; | ||
| }; | ||
|
|
||
| ASSERT(!matched_results.empty()); | ||
| return {true, makeOptRef<TokenBucketContext>(matched_results[0].token_bucket.get())}; | ||
| std::shared_ptr<TokenBucketContext> bucket_context = | ||
| std::shared_ptr<TokenBucketContext>(matched_results[0].token_bucket); | ||
| return {true, bucket_context}; | ||
| } | ||
|
|
||
| // Compare the request descriptor entries with the user descriptor entries. If all non-empty user | ||
| // descriptor values match the request descriptor values, return true | ||
| bool DynamicDescriptorMap::matchDescriptorEntries( | ||
| const std::vector<RateLimit::DescriptorEntry>& request_entries, | ||
| const std::vector<RateLimit::DescriptorEntry>& config_entries) { | ||
| // Check for equality of sizes | ||
| if (request_entries.size() != config_entries.size()) { | ||
| return false; | ||
| } | ||
|
|
||
| for (size_t i = 0; i < request_entries.size(); ++i) { | ||
| // Check if the keys are equal. | ||
| if (request_entries[i].key_ != config_entries[i].key_) { | ||
| return false; | ||
| } | ||
|
|
||
| // Check values are equal or wildcard value is used. | ||
| if (config_entries[i].value_.empty()) { | ||
| continue; | ||
| } | ||
| if (request_entries[i].value_ != config_entries[i].value_) { | ||
| return false; | ||
| } | ||
| } | ||
| return true; | ||
| } | ||
|
|
||
| void DynamicDescriptorMap::addDescriptor(const RateLimit::LocalDescriptor& user_descriptor, | ||
| DynamicDescriptorSharedPtr dynamic_descriptor) { | ||
| auto result = user_descriptors_.emplace(user_descriptor, std::move(dynamic_descriptor)); | ||
| if (!result.second) { | ||
| throw EnvoyException(absl::StrCat("duplicate descriptor in the local rate descriptor: ", | ||
| result.first->first.toString())); | ||
| } | ||
| } | ||
|
|
||
| RateLimitTokenBucketSharedPtr | ||
| DynamicDescriptorMap::getBucket(const RateLimit::Descriptor request_descriptor) { | ||
| for (const auto& pair : user_descriptors_) { | ||
| auto user_descriptor = pair.first; | ||
| if (!matchDescriptorEntries(request_descriptor.entries_, user_descriptor.entries_)) { | ||
| continue; | ||
| } | ||
|
|
||
| // we found a user configured wildcard descriptor that matches the request descriptor. | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please all check all these comments match our style.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have updated this specific comment. Is there a doc/readme that have guidelines about the comment styles for my reference? |
||
| return pair.second->addOrGetDescriptor(request_descriptor); | ||
| } | ||
| return nullptr; | ||
| } | ||
|
|
||
| DynamicDescriptor::DynamicDescriptor(uint64_t per_descriptor_max_tokens, | ||
| uint64_t per_descriptor_tokens_per_fill, | ||
| std::chrono::milliseconds per_descriptor_fill_interval, | ||
| uint32_t lru_size, TimeSource& time_source) | ||
| : max_tokens_(per_descriptor_max_tokens), tokens_per_fill_(per_descriptor_tokens_per_fill), | ||
| fill_interval_(per_descriptor_fill_interval), lru_size_(lru_size), time_source_(time_source) { | ||
| } | ||
|
|
||
| RateLimitTokenBucketSharedPtr | ||
| DynamicDescriptor::addOrGetDescriptor(const RateLimit::Descriptor& request_descriptor) { | ||
| absl::WriterMutexLock lock(&dyn_desc_lock_); | ||
| auto iter = dynamic_descriptors_.find(request_descriptor); | ||
| if (iter != dynamic_descriptors_.end()) { | ||
| if (iter->second.second != lru_list_.begin()) { | ||
| lru_list_.splice(lru_list_.begin(), lru_list_, iter->second.second); | ||
| } | ||
| return iter->second.first; | ||
| } | ||
| // add a new descriptor to the set along with its token bucket | ||
| RateLimitTokenBucketSharedPtr per_descriptor_token_bucket; | ||
| ENVOY_LOG(trace, "creating atomic token bucket for dynamic descriptor"); | ||
| ENVOY_LOG(trace, "max_tokens: {}, tokens_per_fill: {}, fill_interval: {}", max_tokens_, | ||
| tokens_per_fill_, std::chrono::duration<double>(fill_interval_).count()); | ||
| per_descriptor_token_bucket = std::make_shared<RateLimitTokenBucket>( | ||
| max_tokens_, tokens_per_fill_, fill_interval_, time_source_); | ||
|
|
||
| ENVOY_LOG(trace, "DynamicDescriptor::addorGetDescriptor: adding dynamic descriptor: {}", | ||
| request_descriptor.toString()); | ||
| lru_list_.emplace_front(request_descriptor); | ||
| auto result = dynamic_descriptors_.emplace( | ||
| request_descriptor, std::pair(per_descriptor_token_bucket, lru_list_.begin())); | ||
| auto token_bucket = result.first->second.first; | ||
| if (lru_list_.size() > lru_size_) { | ||
| ENVOY_LOG(trace, | ||
| "DynamicDescriptor::addorGetDescriptor: lru_size({}) overflow. Removing dynamic " | ||
| "descriptor: {}", | ||
| lru_size_, lru_list_.back().toString()); | ||
| dynamic_descriptors_.erase(lru_list_.back()); | ||
| lru_list_.pop_back(); | ||
| } | ||
| ASSERT(lru_list_.size() == dynamic_descriptors_.size()); | ||
| return token_bucket; | ||
| } | ||
|
|
||
| } // namespace LocalRateLimit | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: config_descriptor