Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
6ff6a4b
Http LocalRateLimit: Add dynamic token bucket support
vikaschoudhary16 Oct 16, 2024
71ecf21
comments
vikaschoudhary16 Oct 16, 2024
365e7c4
updates
vikaschoudhary16 Oct 16, 2024
cde6760
lru cache with max size, remove locks and basic test
vikaschoudhary16 Oct 18, 2024
e1dc1ac
some cleanup
vikaschoudhary16 Oct 18, 2024
f1e4765
updates
vikaschoudhary16 Oct 18, 2024
15f655e
updates
vikaschoudhary16 Oct 18, 2024
9d251f8
updates
vikaschoudhary16 Oct 18, 2024
ed02fa6
updates
vikaschoudhary16 Oct 18, 2024
d305bd5
cleanup
vikaschoudhary16 Oct 19, 2024
53e84bb
fix
vikaschoudhary16 Oct 19, 2024
bb34179
DynamicDescriptor interface
vikaschoudhary16 Oct 23, 2024
b8fa581
Address review comments
vikaschoudhary16 Jan 14, 2025
0966f09
Merge branch 'main' into lrl-dynamic-tokenbuckets
vikaschoudhary16 Jan 14, 2025
e9690d4
some cleanup
vikaschoudhary16 Jan 15, 2025
0b6eb05
format
vikaschoudhary16 Jan 15, 2025
d0b0433
format
vikaschoudhary16 Jan 16, 2025
3463d28
some cleanup
vikaschoudhary16 Jan 18, 2025
25aa2ab
cleanup
vikaschoudhary16 Jan 18, 2025
0a3df60
Merge branch 'main' into lrl-dynamic-tokenbuckets
vikaschoudhary16 Jan 18, 2025
eb50e85
cleanup and more tests
vikaschoudhary16 Jan 19, 2025
80a770b
test fix
vikaschoudhary16 Jan 19, 2025
9a2448e
fix
vikaschoudhary16 Jan 19, 2025
04eadba
more test
vikaschoudhary16 Jan 19, 2025
210f269
ci trigger
vikaschoudhary16 Jan 19, 2025
e697169
Merge branch 'main' into lrl-dynamic-tokenbuckets
vikaschoudhary16 Jan 19, 2025
44eefb0
ci trigger
vikaschoudhary16 Jan 19, 2025
c3b36d7
more tests
vikaschoudhary16 Jan 20, 2025
97455e8
kick ci
vikaschoudhary16 Jan 20, 2025
8c2a558
more tests
vikaschoudhary16 Jan 20, 2025
ff7d24d
format
vikaschoudhary16 Jan 20, 2025
78d3be0
kick ci
vikaschoudhary16 Jan 20, 2025
039f452
ci kick
vikaschoudhary16 Jan 20, 2025
d1d70ca
ci
vikaschoudhary16 Jan 20, 2025
c7401eb
ci
vikaschoudhary16 Jan 20, 2025
a439c5d
ci kick
vikaschoudhary16 Jan 20, 2025
eaa9c95
Merge branch 'main' into lrl-dynamic-tokenbuckets
vikaschoudhary16 Jan 30, 2025
db7a666
fix merging issues
vikaschoudhary16 Jan 31, 2025
538d597
format
vikaschoudhary16 Jan 31, 2025
e46f681
ci
vikaschoudhary16 Jan 31, 2025
605ef6e
Address review comments
vikaschoudhary16 Feb 3, 2025
5527817
PerConnection
vikaschoudhary16 Feb 3, 2025
2bff228
cleanup
vikaschoudhary16 Feb 4, 2025
894a264
Merge branch 'main' into lrl-dynamic-tokenbuckets
vikaschoudhary16 Feb 5, 2025
45496e7
ci kick
vikaschoudhary16 Feb 5, 2025
eaaf2d4
Address review comments
vikaschoudhary16 Feb 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions api/envoy/extensions/common/ratelimit/v3/ratelimit.proto
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,10 @@ message RateLimitDescriptor {
// Descriptor key.
string key = 1 [(validate.rules).string = {min_len: 1}];

// Descriptor value.
string value = 2 [(validate.rules).string = {min_len: 1}];
// Descriptor value. Blank value is treated as wildcard to create dynamic token buckets for each unique value.
// Blank Values as wild card is currently supported only with envoy server instance level HTTP local rate limiting
// and will not work if HTTP local rate limiting is enabled per connection level.
string value = 2 [(validate.rules).string = {min_len: 0}];
}

// Override rate limit to apply to this descriptor instead of the limit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ option (udpa.annotations.file_status).package_version_status = ACTIVE;
// Local Rate limit :ref:`configuration overview <config_http_filters_local_rate_limit>`.
// [#extension: envoy.filters.http.local_ratelimit]

// [#next-free-field: 18]
// [#next-free-field: 19]
message LocalRateLimit {
// The human readable prefix to use when emitting stats.
string stat_prefix = 1 [(validate.rules).string = {min_len: 1}];
Expand Down Expand Up @@ -167,4 +167,13 @@ message LocalRateLimit {
// 3. :ref:`disable_key <envoy_v3_api_field_config.route.v3.RateLimit.disable_key>`.
// 4. :ref:`override limit <envoy_v3_api_field_config.route.v3.RateLimit.limit>`.
repeated config.route.v3.RateLimit rate_limits = 17;

// Specifies the max dynamic descriptors kept in the cache for a particular wildcard descriptor
// configured in the global :ref:`descriptors<envoy_v3_api_field_extensions.filters.http.local_ratelimit.v3.LocalRateLimit.descriptors>`.
// Wildcard descriptor means descriptor has one or more entries with just key and value omitted. For example if user has configured two descriptors
// with blank value entries, then max dynamic descriptors stored in the LRU cache will be 2 * max_dynamic_descriptors.
// Actual number of dynamic descriptors will depend on the cardinality of unique values received from the http request for the omitted
// values.
// Minimum is 1. Default is 20.
google.protobuf.UInt32Value max_dynamic_descriptors = 18 [(validate.rules).uint32 = {gte: 1}];
}
3 changes: 3 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ new_features:
change: |
Reporting a locality_stats to LRS server when ``rq_issued > 0``, disable by setting runtime guard
``envoy.reloadable_features.report_load_with_rq_issued`` to ``false``.
- area: local_rate_limit
change: |
Added support for dynamic token buckets in local rate limit filter for http requests.
- area: attributes
change: |
Added :ref:`attribute <arch_overview_attributes>` ``upstream.locality`` to obtain upstream locality information.
Expand Down
19 changes: 19 additions & 0 deletions envoy/ratelimit/ratelimit.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,25 @@ struct Descriptor {
absl::StrAppend(out, e.key_, "=", e.value_);
});
}

struct Hash {
using is_transparent = void; // NOLINT(readability-identifier-naming)
template <class DescriptorType> size_t operator()(const DescriptorType& d) const {
return absl::Hash<DescriptorEntries>()(d.entries_);
}
};
struct Equal {
using is_transparent = void; // NOLINT(readability-identifier-naming)
template <class DescriptorTypeA, class DescriptorTypeB>
size_t operator()(const DescriptorTypeA& lhs, const DescriptorTypeB& rhs) const {
return lhs.entries_ == rhs.entries_;
}
};

/**
* Descriptor map.
*/
template <class V> using Map = absl::flat_hash_map<Descriptor, V, Hash, Equal>;
};

/**
Expand Down
1 change: 1 addition & 0 deletions source/common/common/logger.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ const static bool should_log = true;
FUNCTION(kafka) \
FUNCTION(key_value_store) \
FUNCTION(lua) \
FUNCTION(local_rate_limit) \
FUNCTION(main) \
FUNCTION(matcher) \
FUNCTION(misc) \
Expand Down
154 changes: 135 additions & 19 deletions source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <chrono>
#include <cmath>
#include <memory>

#include "envoy/runtime/runtime.h"

Expand Down Expand Up @@ -80,7 +81,8 @@ RateLimitTokenBucket::RateLimitTokenBucket(uint64_t max_tokens, uint64_t tokens_
TimeSource& time_source)
: token_bucket_(max_tokens, time_source,
// Calculate the fill rate in tokens per second.
tokens_per_fill / std::chrono::duration<double>(fill_interval).count()) {}
tokens_per_fill / std::chrono::duration<double>(fill_interval).count()),
fill_interval_(fill_interval) {}

bool RateLimitTokenBucket::consume(double factor, uint64_t to_consume) {
ASSERT(!(factor <= 0.0 || factor > 1.0));
Expand All @@ -93,10 +95,10 @@ LocalRateLimiterImpl::LocalRateLimiterImpl(
const uint64_t tokens_per_fill, Event::Dispatcher& dispatcher,
const Protobuf::RepeatedPtrField<
envoy::extensions::common::ratelimit::v3::LocalRateLimitDescriptor>& descriptors,
bool always_consume_default_token_bucket, ShareProviderSharedPtr shared_provider)
bool always_consume_default_token_bucket, ShareProviderSharedPtr shared_provider,
uint32_t lru_size)
: time_source_(dispatcher.timeSource()), share_provider_(std::move(shared_provider)),
always_consume_default_token_bucket_(always_consume_default_token_bucket) {

// Ignore the default token bucket if fill_interval is 0 because 0 fill_interval means nothing
// and has undefined behavior.
if (fill_interval.count() > 0) {
Expand All @@ -109,8 +111,12 @@ LocalRateLimiterImpl::LocalRateLimiterImpl(

for (const auto& descriptor : descriptors) {
RateLimit::LocalDescriptor new_descriptor;
bool wildcard_found = false;
new_descriptor.entries_.reserve(descriptor.entries_size());
for (const auto& entry : descriptor.entries()) {
if (entry.value().empty()) {
wildcard_found = true;
}
new_descriptor.entries_.push_back({entry.key(), entry.value()});
}

Expand All @@ -126,11 +132,17 @@ LocalRateLimiterImpl::LocalRateLimiterImpl(
throw EnvoyException("local rate limit descriptor token bucket fill timer must be >= 50ms");
}

if (wildcard_found) {
DynamicDescriptorSharedPtr dynamic_descriptor = std::make_shared<DynamicDescriptor>(
per_descriptor_max_tokens, per_descriptor_tokens_per_fill, per_descriptor_fill_interval,
lru_size, dispatcher.timeSource());
dynamic_descriptors_.addDescriptor(std::move(new_descriptor), std::move(dynamic_descriptor));
continue;
}
RateLimitTokenBucketSharedPtr per_descriptor_token_bucket =
std::make_shared<RateLimitTokenBucket>(per_descriptor_max_tokens,
per_descriptor_tokens_per_fill,
per_descriptor_fill_interval, time_source_);

auto result =
descriptors_.emplace(std::move(new_descriptor), std::move(per_descriptor_token_bucket));
if (!result.second) {
Expand All @@ -143,12 +155,12 @@ LocalRateLimiterImpl::LocalRateLimiterImpl(
LocalRateLimiterImpl::~LocalRateLimiterImpl() = default;

struct MatchResult {
std::reference_wrapper<RateLimitTokenBucket> token_bucket;
RateLimitTokenBucketSharedPtr token_bucket;
std::reference_wrapper<const RateLimit::Descriptor> request_descriptor;
};

LocalRateLimiterImpl::Result LocalRateLimiterImpl::requestAllowed(
absl::Span<const RateLimit::Descriptor> request_descriptors) const {
LocalRateLimiterImpl::Result
LocalRateLimiterImpl::requestAllowed(absl::Span<const RateLimit::Descriptor> request_descriptors) {

// In most cases the request descriptors has only few elements. We use a inlined vector to
// avoid heap allocation.
Expand All @@ -158,55 +170,159 @@ LocalRateLimiterImpl::Result LocalRateLimiterImpl::requestAllowed(
for (const auto& request_descriptor : request_descriptors) {
auto iter = descriptors_.find(request_descriptor);
if (iter != descriptors_.end()) {
matched_results.push_back(MatchResult{*iter->second, request_descriptor});
matched_results.push_back(MatchResult{iter->second, request_descriptor});
} else {
auto token_bucket = dynamic_descriptors_.getBucket(request_descriptor);
if (token_bucket != nullptr) {
matched_results.push_back(MatchResult{token_bucket, request_descriptor});
}
}
}

if (matched_results.size() > 1) {
// Sort the matched descriptors by token bucket fill rate to ensure the descriptor with the
// smallest fill rate is consumed first.
std::sort(matched_results.begin(), matched_results.end(), [](const auto& lhs, const auto& rhs) {
return lhs.token_bucket.get().fillRate() < rhs.token_bucket.get().fillRate();
return lhs.token_bucket->fillRate() < rhs.token_bucket->fillRate();
});
}

const double share_factor =
share_provider_ != nullptr ? share_provider_->getTokensShareFactor() : 1.0;

// See if the request is forbidden by any of the matched descriptors.
for (auto match_result : matched_results) {
if (!match_result.token_bucket.get().consume(
for (const auto& match_result : matched_results) {
if (!match_result.token_bucket->consume(
share_factor, match_result.request_descriptor.get().hits_addend_.value_or(1))) {
// If the request is forbidden by a descriptor, return the result and the descriptor
// token bucket.
return {false, makeOptRef<TokenBucketContext>(match_result.token_bucket.get())};
return {false, std::shared_ptr<TokenBucketContext>(match_result.token_bucket)};
}
ENVOY_LOG(trace,
"request allowed by descriptor with fill rate: {}, maxToken: {}, remainingToken {}",
match_result.token_bucket->fillRate(), match_result.token_bucket->maxTokens(),
match_result.token_bucket->remainingTokens());
}

// See if the request is forbidden by the default token bucket.
if (matched_results.empty() || always_consume_default_token_bucket_) {
if (default_token_bucket_ == nullptr) {
return {true, matched_results.empty()
? makeOptRefFromPtr<TokenBucketContext>(nullptr)
: makeOptRef<TokenBucketContext>(matched_results[0].token_bucket.get())};
? std::shared_ptr<TokenBucketContext>(nullptr)
: std::shared_ptr<TokenBucketContext>(matched_results[0].token_bucket)};
}
ASSERT(default_token_bucket_ != nullptr);

if (const bool result = default_token_bucket_->consume(share_factor); !result) {
// If the request is forbidden by the default token bucket, return the result and the
// default token bucket.
return {false, makeOptRefFromPtr<TokenBucketContext>(default_token_bucket_.get())};
return {false, std::shared_ptr<TokenBucketContext>(default_token_bucket_)};
}

// If the request is allowed then return the result the token bucket. The descriptor
// token bucket will be selected as priority if it exists.
return {true, makeOptRef<TokenBucketContext>(matched_results.empty()
? *default_token_bucket_
: matched_results[0].token_bucket.get())};
return {true,
matched_results.empty() ? default_token_bucket_ : matched_results[0].token_bucket};
};

ASSERT(!matched_results.empty());
return {true, makeOptRef<TokenBucketContext>(matched_results[0].token_bucket.get())};
std::shared_ptr<TokenBucketContext> bucket_context =
std::shared_ptr<TokenBucketContext>(matched_results[0].token_bucket);
return {true, bucket_context};
}

// Compare the request descriptor entries with the user descriptor entries. If all non-empty user
// descriptor values match the request descriptor values, return true
bool DynamicDescriptorMap::matchDescriptorEntries(
const std::vector<RateLimit::DescriptorEntry>& request_entries,
const std::vector<RateLimit::DescriptorEntry>& config_entries) {
// Check for equality of sizes
if (request_entries.size() != config_entries.size()) {
return false;
}

for (size_t i = 0; i < request_entries.size(); ++i) {
// Check if the keys are equal.
if (request_entries[i].key_ != config_entries[i].key_) {
return false;
}

// Check values are equal or wildcard value is used.
if (config_entries[i].value_.empty()) {
continue;
}
if (request_entries[i].value_ != config_entries[i].value_) {
return false;
}
}
return true;
}

void DynamicDescriptorMap::addDescriptor(const RateLimit::LocalDescriptor& config_descriptor,
DynamicDescriptorSharedPtr dynamic_descriptor) {
auto result = config_descriptors_.emplace(config_descriptor, std::move(dynamic_descriptor));
if (!result.second) {
throw EnvoyException(absl::StrCat("duplicate descriptor in the local rate descriptor: ",
result.first->first.toString()));
}
}

RateLimitTokenBucketSharedPtr
DynamicDescriptorMap::getBucket(const RateLimit::Descriptor request_descriptor) {
for (const auto& pair : config_descriptors_) {
auto config_descriptor = pair.first;
if (!matchDescriptorEntries(request_descriptor.entries_, config_descriptor.entries_)) {
continue;
}

// here is when a user configured wildcard descriptor matches the request descriptor.
return pair.second->addOrGetDescriptor(request_descriptor);
}
return nullptr;
}

DynamicDescriptor::DynamicDescriptor(uint64_t per_descriptor_max_tokens,
uint64_t per_descriptor_tokens_per_fill,
std::chrono::milliseconds per_descriptor_fill_interval,
uint32_t lru_size, TimeSource& time_source)
: max_tokens_(per_descriptor_max_tokens), tokens_per_fill_(per_descriptor_tokens_per_fill),
fill_interval_(per_descriptor_fill_interval), lru_size_(lru_size), time_source_(time_source) {
}

RateLimitTokenBucketSharedPtr
DynamicDescriptor::addOrGetDescriptor(const RateLimit::Descriptor& request_descriptor) {
absl::WriterMutexLock lock(&dyn_desc_lock_);
auto iter = dynamic_descriptors_.find(request_descriptor);
if (iter != dynamic_descriptors_.end()) {
if (iter->second.second != lru_list_.begin()) {
lru_list_.splice(lru_list_.begin(), lru_list_, iter->second.second);
}
return iter->second.first;
}
// add a new descriptor to the set along with its token bucket
RateLimitTokenBucketSharedPtr per_descriptor_token_bucket;
ENVOY_LOG(trace, "creating atomic token bucket for dynamic descriptor");
ENVOY_LOG(trace, "max_tokens: {}, tokens_per_fill: {}, fill_interval: {}", max_tokens_,
tokens_per_fill_, std::chrono::duration<double>(fill_interval_).count());
per_descriptor_token_bucket = std::make_shared<RateLimitTokenBucket>(
max_tokens_, tokens_per_fill_, fill_interval_, time_source_);

ENVOY_LOG(trace, "DynamicDescriptor::addorGetDescriptor: adding dynamic descriptor: {}",
request_descriptor.toString());
lru_list_.emplace_front(request_descriptor);
auto result = dynamic_descriptors_.emplace(
request_descriptor, std::pair(per_descriptor_token_bucket, lru_list_.begin()));
auto token_bucket = result.first->second.first;
if (lru_list_.size() > lru_size_) {
ENVOY_LOG(trace,
"DynamicDescriptor::addorGetDescriptor: lru_size({}) overflow. Removing dynamic "
"descriptor: {}",
lru_size_, lru_list_.back().toString());
dynamic_descriptors_.erase(lru_list_.back());
lru_list_.pop_back();
}
ASSERT(lru_list_.size() == dynamic_descriptors_.size());
return token_bucket;
}

} // namespace LocalRateLimit
Expand Down
Loading
Loading