-
Notifications
You must be signed in to change notification settings - Fork 5.5k
LocalRateLimit(HTTP): Add dynamic token bucket support #36623
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 12 commits
6ff6a4b
71ecf21
365e7c4
cde6760
e1dc1ac
f1e4765
15f655e
9d251f8
ed02fa6
d305bd5
53e84bb
bb34179
b8fa581
0966f09
e9690d4
0b6eb05
d0b0433
3463d28
25aa2ab
0a3df60
eb50e85
80a770b
9a2448e
04eadba
210f269
e697169
44eefb0
c3b36d7
97455e8
8c2a558
ff7d24d
78d3be0
039f452
d1d70ca
c7401eb
a439c5d
eaa9c95
db7a666
538d597
e46f681
605ef6e
5527817
2bff228
894a264
45496e7
eaaf2d4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,6 +2,7 @@ | |
|
|
||
| #include <chrono> | ||
| #include <cmath> | ||
| #include <memory> | ||
|
|
||
| #include "envoy/runtime/runtime.h" | ||
|
|
||
|
|
@@ -159,7 +160,8 @@ AtomicTokenBucket::AtomicTokenBucket(uint32_t max_tokens, uint32_t tokens_per_fi | |
| TimeSource& time_source) | ||
| : token_bucket_(max_tokens, time_source, | ||
| // Calculate the fill rate in tokens per second. | ||
| tokens_per_fill / std::chrono::duration<double>(fill_interval).count()) {} | ||
| tokens_per_fill / std::chrono::duration<double>(fill_interval).count()), | ||
| fill_interval_(fill_interval) {} | ||
|
|
||
| bool AtomicTokenBucket::consume(double factor) { | ||
| ASSERT(!(factor <= 0.0 || factor > 1.0)); | ||
|
|
@@ -172,14 +174,16 @@ LocalRateLimiterImpl::LocalRateLimiterImpl( | |
| const uint32_t tokens_per_fill, Event::Dispatcher& dispatcher, | ||
| const Protobuf::RepeatedPtrField< | ||
| envoy::extensions::common::ratelimit::v3::LocalRateLimitDescriptor>& descriptors, | ||
| bool always_consume_default_token_bucket, ShareProviderSharedPtr shared_provider) | ||
| bool always_consume_default_token_bucket, ShareProviderSharedPtr shared_provider, | ||
| uint32_t lru_size, bool per_connection) | ||
| : fill_timer_(fill_interval > std::chrono::milliseconds(0) | ||
| ? dispatcher.createTimer([this] { onFillTimer(); }) | ||
| : nullptr), | ||
| time_source_(dispatcher.timeSource()), share_provider_(std::move(shared_provider)), | ||
| always_consume_default_token_bucket_(always_consume_default_token_bucket), | ||
| no_timer_based_rate_limit_token_bucket_(Runtime::runtimeFeatureEnabled( | ||
| "envoy.reloadable_features.no_timer_based_rate_limit_token_bucket")) { | ||
| "envoy.reloadable_features.no_timer_based_rate_limit_token_bucket")), | ||
| dispatcher_(dispatcher) { | ||
| if (fill_timer_ && fill_interval < std::chrono::milliseconds(50)) { | ||
| throw EnvoyException("local rate limit token bucket fill timer must be >= 50ms"); | ||
| } | ||
|
|
@@ -199,8 +203,16 @@ LocalRateLimiterImpl::LocalRateLimiterImpl( | |
|
|
||
| for (const auto& descriptor : descriptors) { | ||
| RateLimit::LocalDescriptor new_descriptor; | ||
| bool wildcard_found = false; | ||
| new_descriptor.entries_.reserve(descriptor.entries_size()); | ||
| for (const auto& entry : descriptor.entries()) { | ||
| if (entry.value().empty()) { | ||
| if (per_connection) { | ||
| throw EnvoyException( | ||
| "local rate descriptor value cannot be empty in per connection rate limit mode"); | ||
| } | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why this restriction is necessary?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not necessary. Just wanted to reduce scope and cover it in follow up. I think it will require just passing |
||
| wildcard_found = true; | ||
| } | ||
| new_descriptor.entries_.push_back({entry.key(), entry.value()}); | ||
| } | ||
|
|
||
|
|
@@ -227,7 +239,13 @@ LocalRateLimiterImpl::LocalRateLimiterImpl( | |
| per_descriptor_max_tokens, per_descriptor_tokens_per_fill, per_descriptor_fill_interval, | ||
| per_descriptor_multiplier, *this); | ||
| } | ||
|
|
||
| if (wildcard_found) { | ||
| DynamicDescriptorSharedPtr dynamic_descriptor = std::make_shared<DynamicDescriptor>( | ||
| per_descriptor_token_bucket, (lru_size == 0 ? 20 : lru_size), dispatcher.timeSource(), | ||
| *this); | ||
| dynamic_descriptors_.addDescriptor(std::move(new_descriptor), std::move(dynamic_descriptor)); | ||
| continue; | ||
| } | ||
| auto result = | ||
| descriptors_.emplace(std::move(new_descriptor), std::move(per_descriptor_token_bucket)); | ||
| if (!result.second) { | ||
|
|
@@ -257,12 +275,13 @@ void LocalRateLimiterImpl::onFillTimer() { | |
| for (const auto& descriptor : descriptors_) { | ||
| descriptor.second->onFillTimer(refill_counter_, share_factor); | ||
| } | ||
| dynamic_descriptors_.onFillTimer(refill_counter_, share_factor); | ||
|
|
||
| fill_timer_->enableTimer(default_token_bucket_->fillInterval()); | ||
| } | ||
|
|
||
| LocalRateLimiterImpl::Result LocalRateLimiterImpl::requestAllowed( | ||
| absl::Span<const RateLimit::LocalDescriptor> request_descriptors) const { | ||
| absl::Span<const RateLimit::LocalDescriptor> request_descriptors) { | ||
|
|
||
| // In most cases the request descriptors has only few elements. We use a inlined vector to | ||
| // avoid heap allocation. | ||
|
|
@@ -273,6 +292,11 @@ LocalRateLimiterImpl::Result LocalRateLimiterImpl::requestAllowed( | |
| auto iter = descriptors_.find(request_descriptor); | ||
| if (iter != descriptors_.end()) { | ||
| matched_descriptors.push_back(iter->second.get()); | ||
| } else { | ||
| auto token_bucket = dynamic_descriptors_.getBucket(request_descriptor); | ||
| if (token_bucket != nullptr) { | ||
| matched_descriptors.push_back(token_bucket.get()); | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -294,6 +318,10 @@ LocalRateLimiterImpl::Result LocalRateLimiterImpl::requestAllowed( | |
| // If the request is forbidden by a descriptor, return the result and the descriptor | ||
| // token bucket. | ||
| return {false, makeOptRefFromPtr<TokenBucketContext>(descriptor)}; | ||
| } else { | ||
| ENVOY_LOG(trace, | ||
| "request allowed by descriptor with fill rate: {}, maxToken: {}, remainingToken {}", | ||
| descriptor->fillRate(), descriptor->maxTokens(), descriptor->remainingTokens()); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -316,6 +344,132 @@ LocalRateLimiterImpl::Result LocalRateLimiterImpl::requestAllowed( | |
| return {true, makeOptRefFromPtr<TokenBucketContext>(matched_descriptors[0])}; | ||
| } | ||
|
|
||
| // Compare the request descriptor entries with the user descriptor entries. If all non-empty user | ||
| // descriptor values match the request descriptor values, return true and fill the new descriptor | ||
| bool DynamicDescriptorMap::compareDescriptorEntries( | ||
| const std::vector<RateLimit::DescriptorEntry>& request_entries, | ||
| const std::vector<RateLimit::DescriptorEntry>& user_entries, | ||
| std::vector<RateLimit::DescriptorEntry>& new_descriptor_entries) { | ||
| // Check for equality of sizes | ||
| if (request_entries.size() != user_entries.size()) { | ||
| return false; | ||
| } | ||
|
|
||
| bool has_empty_value = false; | ||
| for (size_t i = 0; i < request_entries.size(); ++i) { | ||
| // Check if the keys are equal | ||
| if (request_entries[i].key_ != user_entries[i].key_) { | ||
| return false; | ||
| } | ||
|
|
||
| // all non-blank user values must match the request values | ||
| if (!user_entries[i].value_.empty() && user_entries[i].value_ != request_entries[i].value_) { | ||
| return false; | ||
| } | ||
|
|
||
| // Check for empty value in user entries | ||
| if (user_entries[i].value_.empty()) { | ||
| has_empty_value = true; | ||
| } | ||
| new_descriptor_entries.push_back({request_entries[i].key_, request_entries[i].value_}); | ||
| } | ||
| return has_empty_value; | ||
| } | ||
|
|
||
| void DynamicDescriptorMap::addDescriptor(const RateLimit::LocalDescriptor& user_descriptor, | ||
| DynamicDescriptorSharedPtr dynamic_descriptor) { | ||
| auto result = user_descriptors_.emplace(user_descriptor, std::move(dynamic_descriptor)); | ||
| if (!result.second) { | ||
| throw EnvoyException(absl::StrCat("duplicate descriptor in the local rate descriptor: ", | ||
| result.first->first.toString())); | ||
| } | ||
| } | ||
|
|
||
| RateLimitTokenBucketSharedPtr | ||
| DynamicDescriptorMap::getBucket(RateLimit::LocalDescriptor request_descriptor) { | ||
| for (const auto& pair : user_descriptors_) { | ||
| auto user_descriptor = pair.first; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: config_descriptor |
||
| if (user_descriptor.entries_.size() != request_descriptor.entries_.size()) { | ||
| continue; | ||
| } | ||
| RateLimit::LocalDescriptor new_descriptor; | ||
| bool wildcard_found = false; | ||
| new_descriptor.entries_.reserve(user_descriptor.entries_.size()); | ||
| wildcard_found = compareDescriptorEntries(request_descriptor.entries_, user_descriptor.entries_, | ||
| new_descriptor.entries_); | ||
|
|
||
| if (!wildcard_found) { | ||
| continue; | ||
| } | ||
|
|
||
| // we found a user configured wildcard descriptor that matches the request descriptor. | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please all check all these comments match our style.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have updated this specific comment. Is there a doc/readme that have guidelines about the comment styles for my reference? |
||
| return pair.second->addOrGetDescriptor(request_descriptor); | ||
| } | ||
| return nullptr; | ||
| } | ||
|
|
||
| void DynamicDescriptorMap::onFillTimer(uint64_t refill_counter, double factor) { | ||
| for (const auto& pair : user_descriptors_) { | ||
| pair.second->onFillTimer(refill_counter, factor); | ||
| } | ||
| } | ||
|
|
||
| DynamicDescriptor::DynamicDescriptor(RateLimitTokenBucketSharedPtr token_bucket, uint32_t lru_size, | ||
| TimeSource& time_source, LocalRateLimiterImpl& parent) | ||
| : parent_token_bucket_(token_bucket), lru_size_(lru_size), time_source_(time_source), | ||
| no_timer_based_rate_limit_token_bucket_(Runtime::runtimeFeatureEnabled( | ||
| "envoy.reloadable_features.no_timer_based_rate_limit_token_bucket")), | ||
| parent_(parent) {} | ||
|
|
||
| RateLimitTokenBucketSharedPtr | ||
| DynamicDescriptor::addOrGetDescriptor(const RateLimit::LocalDescriptor& request_descriptor) { | ||
| absl::WriterMutexLock lock(&dyn_desc_lock_); | ||
| auto iter = dynamic_descriptors_.find(request_descriptor); | ||
| if (iter != dynamic_descriptors_.end()) { | ||
| lru_list_.splice(lru_list_.begin(), lru_list_, iter->second.second); | ||
| return iter->second.first; | ||
| } | ||
| // add a new descriptor to the set along with its toekn bucket | ||
| RateLimitTokenBucketSharedPtr per_descriptor_token_bucket; | ||
| if (no_timer_based_rate_limit_token_bucket_) { | ||
| ENVOY_LOG(trace, "creating atomic token bucket for dynamic descriptor"); | ||
| ENVOY_LOG(trace, "max_tokens: {}, fill_rate: {}, fill_interval: {}", | ||
| parent_token_bucket_->maxTokens(), parent_token_bucket_->fillRate(), | ||
| std::chrono::duration<double>(parent_token_bucket_->fillInterval()).count()); | ||
| per_descriptor_token_bucket = std::make_shared<AtomicTokenBucket>( | ||
| parent_token_bucket_->maxTokens(), | ||
| uint32_t(parent_token_bucket_->fillRate() * | ||
| std::chrono::duration<double>(parent_token_bucket_->fillInterval()).count()), | ||
| parent_token_bucket_->fillInterval(), time_source_); | ||
| } else { | ||
| per_descriptor_token_bucket = std::make_shared<TimerTokenBucket>( | ||
| parent_token_bucket_->maxTokens(), | ||
| uint32_t(parent_token_bucket_->fillRate() * | ||
| std::chrono::duration<double>(parent_token_bucket_->fillInterval()).count()), | ||
| parent_token_bucket_->fillInterval(), parent_token_bucket_->multiplier(), parent_); | ||
| } | ||
|
|
||
| ENVOY_LOG(trace, "DynamicDescriptor::addorGetDescriptor: adding dynamic descriptor: {}", | ||
| request_descriptor.toString()); | ||
| // add this bucket to cache. | ||
| // After updating cache, make a copy of the cache and update the tls with the new cache. | ||
| auto result = dynamic_descriptors_.emplace( | ||
| request_descriptor, std::pair(per_descriptor_token_bucket, lru_list_.begin())); | ||
| lru_list_.emplace_front(request_descriptor); | ||
| if (lru_list_.size() >= lru_size_) { | ||
| dynamic_descriptors_.erase(lru_list_.back()); | ||
| lru_list_.pop_back(); | ||
| } | ||
| return result.first->second.first; | ||
| } | ||
|
|
||
| void DynamicDescriptor::onFillTimer(uint64_t refill_counter, double factor) { | ||
| absl::WriterMutexLock lock(&dyn_desc_lock_); | ||
| for (auto& pair : dynamic_descriptors_) { | ||
| pair.second.first->onFillTimer(refill_counter, factor); | ||
| } | ||
| } | ||
|
|
||
| } // namespace LocalRateLimit | ||
| } // namespace Common | ||
| } // namespace Filters | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May name this simplely
max_dynamic_descripters. (We may change the elimination algorithm in future, who know?) and please use wrapper number typegoogle.protobuf.UInt32Value.And Please add explict bool to enable this feature, like
google.protobuf.BoolValue use_dynamic_descripters.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done