diff --git a/api/envoy/extensions/common/ratelimit/v3/ratelimit.proto b/api/envoy/extensions/common/ratelimit/v3/ratelimit.proto index 77deff292c9ef..f9cba6de128de 100644 --- a/api/envoy/extensions/common/ratelimit/v3/ratelimit.proto +++ b/api/envoy/extensions/common/ratelimit/v3/ratelimit.proto @@ -103,8 +103,10 @@ message RateLimitDescriptor { // Descriptor key. string key = 1 [(validate.rules).string = {min_len: 1}]; - // Descriptor value. - string value = 2 [(validate.rules).string = {min_len: 1}]; + // Descriptor value. Blank value is treated as wildcard to create dynamic token buckets for each unique value. + // Blank Values as wild card is currently supported only with envoy server instance level HTTP local rate limiting + // and will not work if HTTP local rate limiting is enabled per connection level. + string value = 2 [(validate.rules).string = {min_len: 0}]; } // Override rate limit to apply to this descriptor instead of the limit diff --git a/api/envoy/extensions/filters/http/local_ratelimit/v3/local_rate_limit.proto b/api/envoy/extensions/filters/http/local_ratelimit/v3/local_rate_limit.proto index 82e38ed91d5a6..b0199c04b7263 100644 --- a/api/envoy/extensions/filters/http/local_ratelimit/v3/local_rate_limit.proto +++ b/api/envoy/extensions/filters/http/local_ratelimit/v3/local_rate_limit.proto @@ -23,7 +23,7 @@ option (udpa.annotations.file_status).package_version_status = ACTIVE; // Local Rate limit :ref:`configuration overview `. // [#extension: envoy.filters.http.local_ratelimit] -// [#next-free-field: 18] +// [#next-free-field: 19] message LocalRateLimit { // The human readable prefix to use when emitting stats. string stat_prefix = 1 [(validate.rules).string = {min_len: 1}]; @@ -167,4 +167,13 @@ message LocalRateLimit { // 3. :ref:`disable_key `. // 4. :ref:`override limit `. repeated config.route.v3.RateLimit rate_limits = 17; + + // Specifies the max dynamic descriptors kept in the cache for a particular wildcard descriptor + // configured in the global :ref:`descriptors`. + // Wildcard descriptor means descriptor has one or more entries with just key and value omitted. For example if user has configured two descriptors + // with blank value entries, then max dynamic descriptors stored in the LRU cache will be 2 * max_dynamic_descriptors. + // Actual number of dynamic descriptors will depend on the cardinality of unique values received from the http request for the omitted + // values. + // Minimum is 1. Default is 20. + google.protobuf.UInt32Value max_dynamic_descriptors = 18 [(validate.rules).uint32 = {gte: 1}]; } diff --git a/changelogs/current.yaml b/changelogs/current.yaml index b6b571536e37d..b66514fd31230 100644 --- a/changelogs/current.yaml +++ b/changelogs/current.yaml @@ -132,6 +132,9 @@ new_features: change: | Reporting a locality_stats to LRS server when ``rq_issued > 0``, disable by setting runtime guard ``envoy.reloadable_features.report_load_with_rq_issued`` to ``false``. +- area: local_rate_limit + change: | + Added support for dynamic token buckets in local rate limit filter for http requests. - area: attributes change: | Added :ref:`attribute ` ``upstream.locality`` to obtain upstream locality information. diff --git a/envoy/ratelimit/ratelimit.h b/envoy/ratelimit/ratelimit.h index 3f3f1bc4d823e..971b0c9b7a7f0 100644 --- a/envoy/ratelimit/ratelimit.h +++ b/envoy/ratelimit/ratelimit.h @@ -58,6 +58,25 @@ struct Descriptor { absl::StrAppend(out, e.key_, "=", e.value_); }); } + + struct Hash { + using is_transparent = void; // NOLINT(readability-identifier-naming) + template size_t operator()(const DescriptorType& d) const { + return absl::Hash()(d.entries_); + } + }; + struct Equal { + using is_transparent = void; // NOLINT(readability-identifier-naming) + template + size_t operator()(const DescriptorTypeA& lhs, const DescriptorTypeB& rhs) const { + return lhs.entries_ == rhs.entries_; + } + }; + + /** + * Descriptor map. + */ + template using Map = absl::flat_hash_map; }; /** diff --git a/source/common/common/logger.h b/source/common/common/logger.h index 761da45a97ca3..8db5f52bc52b1 100644 --- a/source/common/common/logger.h +++ b/source/common/common/logger.h @@ -71,6 +71,7 @@ const static bool should_log = true; FUNCTION(kafka) \ FUNCTION(key_value_store) \ FUNCTION(lua) \ + FUNCTION(local_rate_limit) \ FUNCTION(main) \ FUNCTION(matcher) \ FUNCTION(misc) \ diff --git a/source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.cc b/source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.cc index 72c6975e8f21c..1ff6b295b0d52 100644 --- a/source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.cc +++ b/source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.cc @@ -2,6 +2,7 @@ #include #include +#include #include "envoy/runtime/runtime.h" @@ -80,7 +81,8 @@ RateLimitTokenBucket::RateLimitTokenBucket(uint64_t max_tokens, uint64_t tokens_ TimeSource& time_source) : token_bucket_(max_tokens, time_source, // Calculate the fill rate in tokens per second. - tokens_per_fill / std::chrono::duration(fill_interval).count()) {} + tokens_per_fill / std::chrono::duration(fill_interval).count()), + fill_interval_(fill_interval) {} bool RateLimitTokenBucket::consume(double factor, uint64_t to_consume) { ASSERT(!(factor <= 0.0 || factor > 1.0)); @@ -93,10 +95,10 @@ LocalRateLimiterImpl::LocalRateLimiterImpl( const uint64_t tokens_per_fill, Event::Dispatcher& dispatcher, const Protobuf::RepeatedPtrField< envoy::extensions::common::ratelimit::v3::LocalRateLimitDescriptor>& descriptors, - bool always_consume_default_token_bucket, ShareProviderSharedPtr shared_provider) + bool always_consume_default_token_bucket, ShareProviderSharedPtr shared_provider, + uint32_t lru_size) : time_source_(dispatcher.timeSource()), share_provider_(std::move(shared_provider)), always_consume_default_token_bucket_(always_consume_default_token_bucket) { - // Ignore the default token bucket if fill_interval is 0 because 0 fill_interval means nothing // and has undefined behavior. if (fill_interval.count() > 0) { @@ -109,8 +111,12 @@ LocalRateLimiterImpl::LocalRateLimiterImpl( for (const auto& descriptor : descriptors) { RateLimit::LocalDescriptor new_descriptor; + bool wildcard_found = false; new_descriptor.entries_.reserve(descriptor.entries_size()); for (const auto& entry : descriptor.entries()) { + if (entry.value().empty()) { + wildcard_found = true; + } new_descriptor.entries_.push_back({entry.key(), entry.value()}); } @@ -126,11 +132,17 @@ LocalRateLimiterImpl::LocalRateLimiterImpl( throw EnvoyException("local rate limit descriptor token bucket fill timer must be >= 50ms"); } + if (wildcard_found) { + DynamicDescriptorSharedPtr dynamic_descriptor = std::make_shared( + per_descriptor_max_tokens, per_descriptor_tokens_per_fill, per_descriptor_fill_interval, + lru_size, dispatcher.timeSource()); + dynamic_descriptors_.addDescriptor(std::move(new_descriptor), std::move(dynamic_descriptor)); + continue; + } RateLimitTokenBucketSharedPtr per_descriptor_token_bucket = std::make_shared(per_descriptor_max_tokens, per_descriptor_tokens_per_fill, per_descriptor_fill_interval, time_source_); - auto result = descriptors_.emplace(std::move(new_descriptor), std::move(per_descriptor_token_bucket)); if (!result.second) { @@ -143,12 +155,12 @@ LocalRateLimiterImpl::LocalRateLimiterImpl( LocalRateLimiterImpl::~LocalRateLimiterImpl() = default; struct MatchResult { - std::reference_wrapper token_bucket; + RateLimitTokenBucketSharedPtr token_bucket; std::reference_wrapper request_descriptor; }; -LocalRateLimiterImpl::Result LocalRateLimiterImpl::requestAllowed( - absl::Span request_descriptors) const { +LocalRateLimiterImpl::Result +LocalRateLimiterImpl::requestAllowed(absl::Span request_descriptors) { // In most cases the request descriptors has only few elements. We use a inlined vector to // avoid heap allocation. @@ -158,7 +170,12 @@ LocalRateLimiterImpl::Result LocalRateLimiterImpl::requestAllowed( for (const auto& request_descriptor : request_descriptors) { auto iter = descriptors_.find(request_descriptor); if (iter != descriptors_.end()) { - matched_results.push_back(MatchResult{*iter->second, request_descriptor}); + matched_results.push_back(MatchResult{iter->second, request_descriptor}); + } else { + auto token_bucket = dynamic_descriptors_.getBucket(request_descriptor); + if (token_bucket != nullptr) { + matched_results.push_back(MatchResult{token_bucket, request_descriptor}); + } } } @@ -166,7 +183,7 @@ LocalRateLimiterImpl::Result LocalRateLimiterImpl::requestAllowed( // Sort the matched descriptors by token bucket fill rate to ensure the descriptor with the // smallest fill rate is consumed first. std::sort(matched_results.begin(), matched_results.end(), [](const auto& lhs, const auto& rhs) { - return lhs.token_bucket.get().fillRate() < rhs.token_bucket.get().fillRate(); + return lhs.token_bucket->fillRate() < rhs.token_bucket->fillRate(); }); } @@ -174,39 +191,138 @@ LocalRateLimiterImpl::Result LocalRateLimiterImpl::requestAllowed( share_provider_ != nullptr ? share_provider_->getTokensShareFactor() : 1.0; // See if the request is forbidden by any of the matched descriptors. - for (auto match_result : matched_results) { - if (!match_result.token_bucket.get().consume( + for (const auto& match_result : matched_results) { + if (!match_result.token_bucket->consume( share_factor, match_result.request_descriptor.get().hits_addend_.value_or(1))) { // If the request is forbidden by a descriptor, return the result and the descriptor // token bucket. - return {false, makeOptRef(match_result.token_bucket.get())}; + return {false, std::shared_ptr(match_result.token_bucket)}; } + ENVOY_LOG(trace, + "request allowed by descriptor with fill rate: {}, maxToken: {}, remainingToken {}", + match_result.token_bucket->fillRate(), match_result.token_bucket->maxTokens(), + match_result.token_bucket->remainingTokens()); } // See if the request is forbidden by the default token bucket. if (matched_results.empty() || always_consume_default_token_bucket_) { if (default_token_bucket_ == nullptr) { return {true, matched_results.empty() - ? makeOptRefFromPtr(nullptr) - : makeOptRef(matched_results[0].token_bucket.get())}; + ? std::shared_ptr(nullptr) + : std::shared_ptr(matched_results[0].token_bucket)}; } ASSERT(default_token_bucket_ != nullptr); if (const bool result = default_token_bucket_->consume(share_factor); !result) { // If the request is forbidden by the default token bucket, return the result and the // default token bucket. - return {false, makeOptRefFromPtr(default_token_bucket_.get())}; + return {false, std::shared_ptr(default_token_bucket_)}; } // If the request is allowed then return the result the token bucket. The descriptor // token bucket will be selected as priority if it exists. - return {true, makeOptRef(matched_results.empty() - ? *default_token_bucket_ - : matched_results[0].token_bucket.get())}; + return {true, + matched_results.empty() ? default_token_bucket_ : matched_results[0].token_bucket}; }; ASSERT(!matched_results.empty()); - return {true, makeOptRef(matched_results[0].token_bucket.get())}; + std::shared_ptr bucket_context = + std::shared_ptr(matched_results[0].token_bucket); + return {true, bucket_context}; +} + +// Compare the request descriptor entries with the user descriptor entries. If all non-empty user +// descriptor values match the request descriptor values, return true +bool DynamicDescriptorMap::matchDescriptorEntries( + const std::vector& request_entries, + const std::vector& config_entries) { + // Check for equality of sizes + if (request_entries.size() != config_entries.size()) { + return false; + } + + for (size_t i = 0; i < request_entries.size(); ++i) { + // Check if the keys are equal. + if (request_entries[i].key_ != config_entries[i].key_) { + return false; + } + + // Check values are equal or wildcard value is used. + if (config_entries[i].value_.empty()) { + continue; + } + if (request_entries[i].value_ != config_entries[i].value_) { + return false; + } + } + return true; +} + +void DynamicDescriptorMap::addDescriptor(const RateLimit::LocalDescriptor& config_descriptor, + DynamicDescriptorSharedPtr dynamic_descriptor) { + auto result = config_descriptors_.emplace(config_descriptor, std::move(dynamic_descriptor)); + if (!result.second) { + throw EnvoyException(absl::StrCat("duplicate descriptor in the local rate descriptor: ", + result.first->first.toString())); + } +} + +RateLimitTokenBucketSharedPtr +DynamicDescriptorMap::getBucket(const RateLimit::Descriptor request_descriptor) { + for (const auto& pair : config_descriptors_) { + auto config_descriptor = pair.first; + if (!matchDescriptorEntries(request_descriptor.entries_, config_descriptor.entries_)) { + continue; + } + + // here is when a user configured wildcard descriptor matches the request descriptor. + return pair.second->addOrGetDescriptor(request_descriptor); + } + return nullptr; +} + +DynamicDescriptor::DynamicDescriptor(uint64_t per_descriptor_max_tokens, + uint64_t per_descriptor_tokens_per_fill, + std::chrono::milliseconds per_descriptor_fill_interval, + uint32_t lru_size, TimeSource& time_source) + : max_tokens_(per_descriptor_max_tokens), tokens_per_fill_(per_descriptor_tokens_per_fill), + fill_interval_(per_descriptor_fill_interval), lru_size_(lru_size), time_source_(time_source) { +} + +RateLimitTokenBucketSharedPtr +DynamicDescriptor::addOrGetDescriptor(const RateLimit::Descriptor& request_descriptor) { + absl::WriterMutexLock lock(&dyn_desc_lock_); + auto iter = dynamic_descriptors_.find(request_descriptor); + if (iter != dynamic_descriptors_.end()) { + if (iter->second.second != lru_list_.begin()) { + lru_list_.splice(lru_list_.begin(), lru_list_, iter->second.second); + } + return iter->second.first; + } + // add a new descriptor to the set along with its token bucket + RateLimitTokenBucketSharedPtr per_descriptor_token_bucket; + ENVOY_LOG(trace, "creating atomic token bucket for dynamic descriptor"); + ENVOY_LOG(trace, "max_tokens: {}, tokens_per_fill: {}, fill_interval: {}", max_tokens_, + tokens_per_fill_, std::chrono::duration(fill_interval_).count()); + per_descriptor_token_bucket = std::make_shared( + max_tokens_, tokens_per_fill_, fill_interval_, time_source_); + + ENVOY_LOG(trace, "DynamicDescriptor::addorGetDescriptor: adding dynamic descriptor: {}", + request_descriptor.toString()); + lru_list_.emplace_front(request_descriptor); + auto result = dynamic_descriptors_.emplace( + request_descriptor, std::pair(per_descriptor_token_bucket, lru_list_.begin())); + auto token_bucket = result.first->second.first; + if (lru_list_.size() > lru_size_) { + ENVOY_LOG(trace, + "DynamicDescriptor::addorGetDescriptor: lru_size({}) overflow. Removing dynamic " + "descriptor: {}", + lru_size_, lru_list_.back().toString()); + dynamic_descriptors_.erase(lru_list_.back()); + lru_list_.pop_back(); + } + ASSERT(lru_list_.size() == dynamic_descriptors_.size()); + return token_bucket; } } // namespace LocalRateLimit diff --git a/source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.h b/source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.h index 8c0a0bf97124b..cc3339e6580e2 100644 --- a/source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.h +++ b/source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.h @@ -20,8 +20,49 @@ namespace Filters { namespace Common { namespace LocalRateLimit { +class LocalRateLimiterImpl; +class RateLimitTokenBucket; +using RateLimitTokenBucketSharedPtr = std::shared_ptr; using ProtoLocalClusterRateLimit = envoy::extensions::common::ratelimit::v3::LocalClusterRateLimit; +class DynamicDescriptor : public Logger::Loggable { +public: + DynamicDescriptor(uint64_t max_tokens, uint64_t tokens_per_fill, + std::chrono::milliseconds fill_interval, uint32_t lru_size, TimeSource&); + // add a new user configured descriptor to the set. + RateLimitTokenBucketSharedPtr addOrGetDescriptor(const RateLimit::Descriptor& request_descriptor); + +private: + using LruList = std::list; + + mutable absl::Mutex dyn_desc_lock_; + RateLimit::Descriptor::Map> + dynamic_descriptors_ ABSL_GUARDED_BY(dyn_desc_lock_); + + uint64_t max_tokens_; + uint64_t tokens_per_fill_; + const std::chrono::milliseconds fill_interval_; + LruList lru_list_; + uint32_t lru_size_; + TimeSource& time_source_; +}; + +using DynamicDescriptorSharedPtr = std::shared_ptr; + +class DynamicDescriptorMap : public Logger::Loggable { +public: + // add a new user configured descriptor to the set. + void addDescriptor(const RateLimit::LocalDescriptor& descriptor, + DynamicDescriptorSharedPtr dynamic_descriptor); + // pass request_descriptors to the dynamic descriptor set to get the token bucket. + RateLimitTokenBucketSharedPtr getBucket(const RateLimit::Descriptor); + +private: + bool matchDescriptorEntries(const std::vector& request_entries, + const std::vector& user_entries); + RateLimit::LocalDescriptor::Map config_descriptors_; +}; + class ShareProvider { public: virtual ~ShareProvider() = default; @@ -66,9 +107,8 @@ class TokenBucketContext { virtual uint64_t remainingTokens() const PURE; }; -class LocalRateLimiterImpl; - -class RateLimitTokenBucket : public TokenBucketContext { +class RateLimitTokenBucket : public TokenBucketContext, + public Logger::Loggable { public: RateLimitTokenBucket(uint64_t max_tokens, uint64_t tokens_per_fill, std::chrono::milliseconds fill_interval, TimeSource& time_source); @@ -76,6 +116,7 @@ class RateLimitTokenBucket : public TokenBucketContext { // RateLimitTokenBucket bool consume(double factor = 1.0, uint64_t tokens = 1); double fillRate() const { return token_bucket_.fillRate(); } + std::chrono::milliseconds fillInterval() const { return fill_interval_; } uint64_t maxTokens() const override { return static_cast(token_bucket_.maxTokens()); } uint64_t remainingTokens() const override { @@ -84,14 +125,15 @@ class RateLimitTokenBucket : public TokenBucketContext { private: AtomicTokenBucketImpl token_bucket_; + const std::chrono::milliseconds fill_interval_; }; using RateLimitTokenBucketSharedPtr = std::shared_ptr; -class LocalRateLimiterImpl { +class LocalRateLimiterImpl : public Logger::Loggable { public: struct Result { bool allowed{}; - OptRef token_bucket_context{}; + std::shared_ptr token_bucket_context{}; }; LocalRateLimiterImpl( @@ -100,10 +142,10 @@ class LocalRateLimiterImpl { const Protobuf::RepeatedPtrField< envoy::extensions::common::ratelimit::v3::LocalRateLimitDescriptor>& descriptors, bool always_consume_default_token_bucket = true, - ShareProviderSharedPtr shared_provider = nullptr); + ShareProviderSharedPtr shared_provider = nullptr, const uint32_t lru_size = 20); ~LocalRateLimiterImpl(); - Result requestAllowed(absl::Span request_descriptors) const; + Result requestAllowed(absl::Span request_descriptors); private: RateLimitTokenBucketSharedPtr default_token_bucket_; @@ -111,6 +153,7 @@ class LocalRateLimiterImpl { TimeSource& time_source_; RateLimit::LocalDescriptor::Map descriptors_; + DynamicDescriptorMap dynamic_descriptors_{}; ShareProviderSharedPtr share_provider_; mutable Thread::ThreadSynchronizer synchronizer_; // Used for testing only. diff --git a/source/extensions/filters/http/local_ratelimit/local_ratelimit.cc b/source/extensions/filters/http/local_ratelimit/local_ratelimit.cc index 8af1dd4498072..75cce78fbfce7 100644 --- a/source/extensions/filters/http/local_ratelimit/local_ratelimit.cc +++ b/source/extensions/filters/http/local_ratelimit/local_ratelimit.cc @@ -30,12 +30,15 @@ FilterConfig::FilterConfig( PROTOBUF_GET_MS_OR_DEFAULT(config.token_bucket(), fill_interval, 0))), max_tokens_(config.token_bucket().max_tokens()), tokens_per_fill_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(config.token_bucket(), tokens_per_fill, 1)), + max_dynamic_descriptors_( + config.has_max_dynamic_descriptors() ? config.max_dynamic_descriptors().value() : 20), descriptors_(config.descriptors()), rate_limit_per_connection_(config.local_rate_limit_per_downstream_connection()), always_consume_default_token_bucket_( config.has_always_consume_default_token_bucket() ? config.always_consume_default_token_bucket().value() : true), + local_info_(context.localInfo()), runtime_(context.runtime()), filter_enabled_( config.has_filter_enabled() @@ -109,7 +112,7 @@ FilterConfig::FilterConfig( rate_limiter_ = std::make_unique( fill_interval_, max_tokens_, tokens_per_fill_, dispatcher_, descriptors_, - always_consume_default_token_bucket_, std::move(share_provider)); + always_consume_default_token_bucket_, std::move(share_provider), max_dynamic_descriptors_); } Filters::Common::LocalRateLimit::LocalRateLimiterImpl::Result @@ -196,7 +199,7 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers, Http::FilterHeadersStatus Filter::encodeHeaders(Http::ResponseHeaderMap& headers, bool) { // We can never assume the decodeHeaders() was called before encodeHeaders(). - if (used_config_->enableXRateLimitHeaders() && token_bucket_context_.has_value()) { + if (used_config_->enableXRateLimitHeaders() && token_bucket_context_) { headers.addReferenceKey( HttpFilters::Common::RateLimit::XRateLimitHeaders::get().XRateLimitLimit, token_bucket_context_->maxTokens()); @@ -215,7 +218,7 @@ Filter::requestAllowed(absl::Span request_descripto : used_config_->requestAllowed(request_descriptors); } -const Filters::Common::LocalRateLimit::LocalRateLimiterImpl& Filter::getPerConnectionRateLimiter() { +Filters::Common::LocalRateLimit::LocalRateLimiterImpl& Filter::getPerConnectionRateLimiter() { ASSERT(used_config_->rateLimitPerConnection()); auto typed_state = @@ -225,16 +228,16 @@ const Filters::Common::LocalRateLimit::LocalRateLimiterImpl& Filter::getPerConne if (typed_state == nullptr) { auto limiter = std::make_shared( used_config_->fillInterval(), used_config_->maxTokens(), used_config_->tokensPerFill(), - decoder_callbacks_->dispatcher(), used_config_->descriptors(), - used_config_->consumeDefaultTokenBucket()); + used_config_->maxDynamicDescriptors(), decoder_callbacks_->dispatcher(), + used_config_->descriptors(), used_config_->consumeDefaultTokenBucket()); decoder_callbacks_->streamInfo().filterState()->setData( PerConnectionRateLimiter::key(), limiter, StreamInfo::FilterState::StateType::ReadOnly, StreamInfo::FilterState::LifeSpan::Connection); - return limiter->value(); + return const_cast(limiter->value()); } - return typed_state->value(); + return const_cast(typed_state->value()); } void Filter::populateDescriptors(std::vector& descriptors, diff --git a/source/extensions/filters/http/local_ratelimit/local_ratelimit.h b/source/extensions/filters/http/local_ratelimit/local_ratelimit.h index e8224a0b2b2f8..303b2798d61ba 100644 --- a/source/extensions/filters/http/local_ratelimit/local_ratelimit.h +++ b/source/extensions/filters/http/local_ratelimit/local_ratelimit.h @@ -52,12 +52,12 @@ class PerConnectionRateLimiter : public StreamInfo::FilterState::Object { public: PerConnectionRateLimiter( const std::chrono::milliseconds& fill_interval, uint32_t max_tokens, uint32_t tokens_per_fill, - Envoy::Event::Dispatcher& dispatcher, + uint32_t max_dynamic_descriptors, Envoy::Event::Dispatcher& dispatcher, const Protobuf::RepeatedPtrField< envoy::extensions::common::ratelimit::v3::LocalRateLimitDescriptor>& descriptor, bool always_consume_default_token_bucket) : rate_limiter_(fill_interval, max_tokens, tokens_per_fill, dispatcher, descriptor, - always_consume_default_token_bucket) {} + always_consume_default_token_bucket, nullptr, max_dynamic_descriptors) {} static const std::string& key(); const Filters::Common::LocalRateLimit::LocalRateLimiterImpl& value() const { return rate_limiter_; @@ -99,6 +99,7 @@ class FilterConfig : public Router::RouteSpecificFilterConfig, const std::chrono::milliseconds& fillInterval() const { return fill_interval_; } uint32_t maxTokens() const { return max_tokens_; } uint32_t tokensPerFill() const { return tokens_per_fill_; } + uint32_t maxDynamicDescriptors() const { return max_dynamic_descriptors_; } const Protobuf::RepeatedPtrField< envoy::extensions::common::ratelimit::v3::LocalRateLimitDescriptor>& descriptors() const { @@ -145,6 +146,7 @@ class FilterConfig : public Router::RouteSpecificFilterConfig, const std::chrono::milliseconds fill_interval_; const uint32_t max_tokens_; const uint32_t tokens_per_fill_; + const uint32_t max_dynamic_descriptors_; const Protobuf::RepeatedPtrField< envoy::extensions::common::ratelimit::v3::LocalRateLimitDescriptor> descriptors_; @@ -193,7 +195,7 @@ class Filter : public Http::PassThroughFilter, Logger::Loggable& descriptors, Http::RequestHeaderMap& headers); VhRateLimitOptions getVirtualHostRateLimitOption(const Router::RouteConstSharedPtr& route); - const Filters::Common::LocalRateLimit::LocalRateLimiterImpl& getPerConnectionRateLimiter(); + Filters::Common::LocalRateLimit::LocalRateLimiterImpl& getPerConnectionRateLimiter(); Filters::Common::LocalRateLimit::LocalRateLimiterImpl::Result requestAllowed(absl::Span request_descriptors); @@ -201,7 +203,7 @@ class Filter : public Http::PassThroughFilter, Logger::Loggable token_bucket_context_; + std::shared_ptr token_bucket_context_; VhRateLimitOptions vh_rate_limits_; }; diff --git a/test/extensions/filters/common/local_ratelimit/local_ratelimit_test.cc b/test/extensions/filters/common/local_ratelimit/local_ratelimit_test.cc index 0e57f7b078963..637ee43e6ca6a 100644 --- a/test/extensions/filters/common/local_ratelimit/local_ratelimit_test.cc +++ b/test/extensions/filters/common/local_ratelimit/local_ratelimit_test.cc @@ -141,9 +141,11 @@ class LocalRateLimiterDescriptorImplTest : public LocalRateLimiterImplTest { public: void initializeWithAtomicTokenBucketDescriptor(const std::chrono::milliseconds fill_interval, const uint32_t max_tokens, - const uint32_t tokens_per_fill) { - rate_limiter_ = std::make_shared( - fill_interval, max_tokens, tokens_per_fill, dispatcher_, descriptors_); + const uint32_t tokens_per_fill, + uint32_t lru_size = 20) { + rate_limiter_ = + std::make_shared(fill_interval, max_tokens, tokens_per_fill, + dispatcher_, descriptors_, true, nullptr, lru_size); } static constexpr absl::string_view single_descriptor_config_yaml = R"( @@ -156,6 +158,26 @@ class LocalRateLimiterDescriptorImplTest : public LocalRateLimiterImplTest { fill_interval: {} )"; + static constexpr absl::string_view wildcard_descriptor_config_yaml = R"( + entries: + - key: user + token_bucket: + max_tokens: {} + tokens_per_fill: {} + fill_interval: {} + )"; + + static constexpr absl::string_view multiple_wildcard_descriptor_config_yaml = R"( + entries: + - key: user + - key: org + value: test + token_bucket: + max_tokens: {} + tokens_per_fill: {} + fill_interval: {} + )"; + static constexpr absl::string_view multiple_descriptor_config_yaml = R"( entries: - key: hello @@ -174,6 +196,124 @@ class LocalRateLimiterDescriptorImplTest : public LocalRateLimiterImplTest { std::vector no_match_descriptor_{{{{"no_match", "no_match"}}}}; }; +// Make sure error raised in case duplicate/replicated descriptors are found. +TEST_F(LocalRateLimiterImplTest, DuplicatedDynamicTokenBucketDescriptor) { + TestUtility::loadFromYaml( + fmt::format(LocalRateLimiterDescriptorImplTest::wildcard_descriptor_config_yaml, 2, 1, "60s"), + *descriptors_.Add()); + TestUtility::loadFromYaml( + fmt::format(LocalRateLimiterDescriptorImplTest::wildcard_descriptor_config_yaml, 2, 1, "60s"), + *descriptors_.Add()); + + EXPECT_THROW_WITH_MESSAGE(LocalRateLimiterImpl(std::chrono::milliseconds(60000), 2, 1, + dispatcher_, descriptors_, true, nullptr, 1), + + EnvoyException, + "duplicate descriptor in the local rate descriptor: user="); +} + +// Verify dynamic token bucket functionality with a single entry descriptor. +TEST_F(LocalRateLimiterDescriptorImplTest, DynamicTokenBuckets) { + TestUtility::loadFromYaml(fmt::format(wildcard_descriptor_config_yaml, 2, 2, "1s"), + *descriptors_.Add()); + initializeWithAtomicTokenBucketDescriptor(std::chrono::milliseconds(50), 20, 2, 1); + + std::vector descriptors{{{{"user", "A"}}}}; + + // Descriptor from 2 -> 1 tokens + EXPECT_TRUE(rate_limiter_->requestAllowed(descriptors).allowed); + // Descriptor from 1 -> 0 tokens + EXPECT_TRUE(rate_limiter_->requestAllowed(descriptors).allowed); + // Descriptor from 0 -> 0 tokens + EXPECT_FALSE(rate_limiter_->requestAllowed(descriptors).allowed); + + std::vector descriptors2{{{{"user", "B"}}}}; + // Descriptor from 2 -> 1 tokens + EXPECT_TRUE(rate_limiter_->requestAllowed(descriptors2).allowed); + // Descriptor from 1 -> 0 tokens + EXPECT_TRUE(rate_limiter_->requestAllowed(descriptors2).allowed); + // Descriptor from 0 -> 0 tokens + EXPECT_FALSE(rate_limiter_->requestAllowed(descriptors2).allowed); + + // this must not be rate-limited because it will be handled by default bucket which uses + // max_tokens i.e 20 + std::vector extra_entries_descriptor{{{{"user", "C"}, {"key", "value"}}}}; + EXPECT_TRUE(rate_limiter_->requestAllowed(extra_entries_descriptor).allowed); + EXPECT_TRUE(rate_limiter_->requestAllowed(extra_entries_descriptor).allowed); + EXPECT_TRUE(rate_limiter_->requestAllowed(extra_entries_descriptor).allowed); + + // this must not be rate-limited because it will be handled by default bucket which uses + // max_tokens i.e 20 + std::vector different_key_descriptor{{{{"notuser", "A"}}}}; + EXPECT_TRUE(rate_limiter_->requestAllowed(different_key_descriptor).allowed); + EXPECT_TRUE(rate_limiter_->requestAllowed(different_key_descriptor).allowed); + EXPECT_TRUE(rate_limiter_->requestAllowed(different_key_descriptor).allowed); +} + +// Verify dynamic token bucket functionality with multiple entries descriptor. +TEST_F(LocalRateLimiterDescriptorImplTest, DynamicTokenBucketswildcardWithMultipleEntries) { + TestUtility::loadFromYaml(fmt::format(multiple_wildcard_descriptor_config_yaml, 2, 2, "1s"), + *descriptors_.Add()); + initializeWithAtomicTokenBucketDescriptor(std::chrono::milliseconds(50), 20, 2, 1); + + std::vector descriptors{{{{"user", "A"}}}}; + // Descriptor from 2 -> 2 tokens + EXPECT_TRUE(rate_limiter_->requestAllowed(descriptors).allowed); + // Descriptor from 2 -> 2 tokens + EXPECT_TRUE(rate_limiter_->requestAllowed(descriptors).allowed); + // Descriptor from 2 -> 2 tokens. This is to check if the tokens are not consumed + EXPECT_TRUE(rate_limiter_->requestAllowed(descriptors).allowed); + + // same size entries but non-matching key for wildcard descriptor. Should not be rate-limited. + std::vector descriptors2{{{{"user", "A"}, {"key", "value"}}}}; + // Descriptor from 2 -> 2 tokens + EXPECT_TRUE(rate_limiter_->requestAllowed(descriptors2).allowed); + // Descriptor from 2 -> 2 tokens + EXPECT_TRUE(rate_limiter_->requestAllowed(descriptors2).allowed); + // Descriptor from 2 -> 2 tokens. This is to check if the tokens are not consumed + EXPECT_TRUE(rate_limiter_->requestAllowed(descriptors2).allowed); + + // same size entries but non-wildcard key's values does not match. Should not be rate-limited. + std::vector descriptors3{{{{"user", "A"}, {"org", "not-test"}}}}; + // Descriptor from 2 -> 2 tokens + EXPECT_TRUE(rate_limiter_->requestAllowed(descriptors3).allowed); + // Descriptor from 2 -> 2 tokens + EXPECT_TRUE(rate_limiter_->requestAllowed(descriptors3).allowed); + // Descriptor from 2 -> 2 tokens. This is to check if the tokens are not consumed + EXPECT_TRUE(rate_limiter_->requestAllowed(descriptors3).allowed); + + // this must be rate-limited because non-wildcard key-value matches and wildcard key matches + std::vector descriptors4{{{{"user", "A"}, {"org", "test"}}}}; + // Descriptor from 2 -> 1 tokens + EXPECT_TRUE(rate_limiter_->requestAllowed(descriptors4).allowed); + // Descriptor from 1 -> 0 tokens + EXPECT_TRUE(rate_limiter_->requestAllowed(descriptors4).allowed); + // Descriptor from 0 -> 0 tokens + EXPECT_FALSE(rate_limiter_->requestAllowed(descriptors4).allowed); +} + +TEST_F(LocalRateLimiterDescriptorImplTest, DynamicTokenBucketsMixedRequestOrder) { + TestUtility::loadFromYaml(fmt::format(wildcard_descriptor_config_yaml, 2, 2, "1s"), + *descriptors_.Add()); + initializeWithAtomicTokenBucketDescriptor(std::chrono::milliseconds(50), 4, 2, 2); + + std::vector descriptors{{{{"user", "A"}}}}; + std::vector descriptors2{{{{"user", "B"}}}}; + + // Descriptor from 2 -> 1 tokens + EXPECT_TRUE(rate_limiter_->requestAllowed(descriptors).allowed); + // Descriptor from 2 -> 1 tokens + EXPECT_TRUE(rate_limiter_->requestAllowed(descriptors2).allowed); + // Descriptor from 1 -> 0 tokens + EXPECT_TRUE(rate_limiter_->requestAllowed(descriptors).allowed); + // Descriptor from 1 -> 0 tokens + EXPECT_TRUE(rate_limiter_->requestAllowed(descriptors2).allowed); + // Descriptor from 0 -> 0 tokens + EXPECT_FALSE(rate_limiter_->requestAllowed(descriptors).allowed); + // Descriptor from 0 -> 0 tokens + EXPECT_FALSE(rate_limiter_->requestAllowed(descriptors2).allowed); +} + // Verify descriptor rate limit time with small fill interval is rejected. TEST_F(LocalRateLimiterDescriptorImplTest, DescriptorRateLimitSmallFillInterval) { // Set fill interval to 10 milliseconds. @@ -551,7 +691,7 @@ TEST_F(LocalRateLimiterDescriptorImplTest, NullDefaultTokenBucket) { // Not match any descriptor and default token bucket is null. auto no_match_result = rate_limiter_->requestAllowed(no_match_descriptor_); EXPECT_TRUE(no_match_result.allowed); - EXPECT_FALSE(no_match_result.token_bucket_context.has_value()); + EXPECT_FALSE(no_match_result.token_bucket_context); } } // Namespace LocalRateLimit diff --git a/test/extensions/filters/http/local_ratelimit/BUILD b/test/extensions/filters/http/local_ratelimit/BUILD index 56185a9bff3aa..366f0e675f80e 100644 --- a/test/extensions/filters/http/local_ratelimit/BUILD +++ b/test/extensions/filters/http/local_ratelimit/BUILD @@ -52,5 +52,6 @@ envoy_extension_cc_test( deps = [ "//source/extensions/filters/http/local_ratelimit:config", "//test/integration:http_protocol_integration_lib", + "//test/test_common:test_runtime_lib", ], ) diff --git a/test/extensions/filters/http/local_ratelimit/local_ratelimit_integration_test.cc b/test/extensions/filters/http/local_ratelimit/local_ratelimit_integration_test.cc index 22de3f5089ae1..d3fa29ea12e09 100644 --- a/test/extensions/filters/http/local_ratelimit/local_ratelimit_integration_test.cc +++ b/test/extensions/filters/http/local_ratelimit/local_ratelimit_integration_test.cc @@ -1,6 +1,7 @@ #include "source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.h" #include "test/integration/http_protocol_integration.h" +#include "test/test_common/test_runtime.h" #include "gtest/gtest.h" @@ -140,6 +141,40 @@ class LocalRateLimitFilterIntegrationTest : public Event::TestUsingSimulatedTime } } + IntegrationStreamDecoderPtr makeRequest(const std::string& cluster, + const std::string& path = "/test/long/url") { + return codec_client_->makeRequestWithBody( + Http::TestRequestHeaderMapImpl{{":method", "GET"}, + {":path", path}, + {":scheme", "http"}, + {":authority", "host"}, + {"x-envoy-downstream-service-cluster", cluster}}, + 0); + } + + void verifyResponse(IntegrationStreamDecoderPtr response, const std::string& expected_status, + size_t expected_body_size) { + ASSERT_TRUE(response->waitForEndStream()); + EXPECT_TRUE(response->complete()); + EXPECT_EQ(expected_status, response->headers().getStatusValue()); + EXPECT_EQ(expected_body_size, response->body().size()); + } + + void sendAndVerifyRequest(const std::string& cluster, const std::string& expected_status, + size_t expected_body_size) { + auto response = makeRequest(cluster); + waitForNextUpstreamRequest(); + upstream_request_->encodeHeaders(default_response_headers_, 1); + verifyResponse(move(response), expected_status, expected_body_size); + EXPECT_TRUE(upstream_request_->complete()); + EXPECT_EQ(0U, upstream_request_->bodyLength()); + } + void sendRateLimitedRequest(const std::string& cluster) { + auto response = makeRequest(cluster); + verifyResponse(move(response), "429", + 18); // 18 is the expected body size for rate-limited responses. + } + static constexpr absl::string_view filter_config_ = R"EOF( name: envoy.filters.http.local_ratelimit @@ -168,6 +203,47 @@ name: envoy.filters.http.local_ratelimit local_rate_limit_per_downstream_connection: {} )EOF"; + static constexpr absl::string_view filter_config_with_blank_value_descriptor_ = + R"EOF( +name: envoy.filters.http.local_ratelimit +typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.local_ratelimit.v3.LocalRateLimit + stat_prefix: http_local_rate_limiter + max_dynamic_descriptors: {} + token_bucket: + max_tokens: 2 + tokens_per_fill: 1 + fill_interval: 1000s + filter_enabled: + runtime_key: local_rate_limit_enabled + default_value: + numerator: 100 + denominator: HUNDRED + filter_enforced: + runtime_key: local_rate_limit_enforced + default_value: + numerator: 100 + denominator: HUNDRED + response_headers_to_add: + - append_action: OVERWRITE_IF_EXISTS_OR_ADD + header: + key: x-local-rate-limit + value: 'true' + descriptors: + - entries: + - key: client_cluster + token_bucket: + max_tokens: 1 + tokens_per_fill: 1 + fill_interval: 1000s + rate_limits: + - actions: # any actions in here + - request_headers: + header_name: x-envoy-downstream-service-cluster + descriptor_key: client_cluster + local_rate_limit_per_downstream_connection: {} +)EOF"; + const std::string filter_config_with_local_cluster_rate_limit_ = R"EOF( name: envoy.filters.http.local_ratelimit @@ -301,6 +377,66 @@ INSTANTIATE_TEST_SUITE_P( testing::ValuesIn(HttpProtocolIntegrationTest::getProtocolTestParamsWithoutHTTP3()), HttpProtocolIntegrationTest::protocolTestParamsToString); +TEST_P(LocalRateLimitFilterIntegrationTest, DynamicDesciptorsBasicTest) { + initializeFilter(fmt::format(filter_config_with_blank_value_descriptor_, 20, "false")); + // filter is adding dynamic descriptors based on the request header + // 'x-envoy-downstream-service-cluster' and the token bucket is set to 1 token per fill interval + // of 1000s which means only one request is allowed per 1000s for each unique value of + // 'x-envoy-downstream-service-cluster' header. + + codec_client_ = makeHttpConnection(lookupPort("http")); + sendAndVerifyRequest("foo", "200", 0); + cleanupUpstreamAndDownstream(); + + // 1 token is exhausted for 'foo' cluster, so the next request with the same cluster should be + // rate limited. + codec_client_ = makeHttpConnection(lookupPort("http")); + sendRateLimitedRequest("foo"); + cleanupUpstreamAndDownstream(); + + // The next request with a different cluster, 'bar', should be allowed. + codec_client_ = makeHttpConnection(lookupPort("http")); + sendAndVerifyRequest("bar", "200", 0); + cleanupUpstreamAndDownstream(); + + // 1 token is exhausted for 'bar' cluster as well, so the next request with the same cluster + // should be rate limited. + codec_client_ = makeHttpConnection(lookupPort("http")); + sendRateLimitedRequest("bar"); + cleanupUpstreamAndDownstream(); +} + +TEST_P(LocalRateLimitFilterIntegrationTest, DesciptorsBasicTestWithMinimumMaxDynamicDescriptors) { + auto max_dynamic_descriptors = 1; + initializeFilter( + fmt::format(filter_config_with_blank_value_descriptor_, max_dynamic_descriptors, "false")); + // filter is adding dynamic descriptors based on the request header + // 'x-envoy-downstream-service-cluster' and the token bucket is set to 1 token per fill interval + // of 1000s which means only one request is allowed per 1000s for each unique value of + // 'x-envoy-downstream-service-cluster' header. + + codec_client_ = makeHttpConnection(lookupPort("http")); + sendAndVerifyRequest("foo", "200", 0); + cleanupUpstreamAndDownstream(); + + // 1 token is exhausted for 'foo' cluster, so the next request with the same cluster should be + // rate limited. + codec_client_ = makeHttpConnection(lookupPort("http")); + sendRateLimitedRequest("foo"); + cleanupUpstreamAndDownstream(); + + // The next request with a different cluster, 'bar', should be allowed. + codec_client_ = makeHttpConnection(lookupPort("http")); + sendAndVerifyRequest("bar", "200", 0); + cleanupUpstreamAndDownstream(); + + // 1 token is exhausted for 'bar' cluster as well, so the next request with the same cluster + // should be rate limited. + codec_client_ = makeHttpConnection(lookupPort("http")); + sendRateLimitedRequest("bar"); + cleanupUpstreamAndDownstream(); +} + TEST_P(LocalRateLimitFilterIntegrationTest, DenyRequestPerProcess) { initializeFilter(fmt::format(filter_config_, "false"));