From 72331b4e2a59f2220ff3aeb01ec4e4191ac291f4 Mon Sep 17 00:00:00 2001 From: zirain Date: Thu, 15 May 2025 21:51:55 +0800 Subject: [PATCH 01/13] local_ratelimit: support x-ratelimit-reset header Signed-off-by: zirain --- source/common/common/token_bucket_impl.cc | 9 +++++++++ source/common/common/token_bucket_impl.h | 6 ++++++ .../common/local_ratelimit/local_ratelimit_impl.h | 5 +++++ .../filters/http/local_ratelimit/local_ratelimit.cc | 5 +++++ 4 files changed, 25 insertions(+) diff --git a/source/common/common/token_bucket_impl.cc b/source/common/common/token_bucket_impl.cc index 711cebbae33ff..67953e5f41e33 100644 --- a/source/common/common/token_bucket_impl.cc +++ b/source/common/common/token_bucket_impl.cc @@ -103,4 +103,13 @@ double AtomicTokenBucketImpl::timeNowInSeconds() const { return std::chrono::duration(time_source_.monotonicTime().time_since_epoch()).count(); } +std::chrono::milliseconds AtomicTokenBucketImpl::nextTokenAvailable() const { + // If there are tokens available, return immediately. + if (remainingTokens() >= 1) { + return std::chrono::milliseconds(0); + } + + return std::chrono::milliseconds(static_cast(std::ceil((1 / fill_rate_) * 1000))); +} + } // namespace Envoy diff --git a/source/common/common/token_bucket_impl.h b/source/common/common/token_bucket_impl.h index df45eefa42b0b..bb52d93e5afbf 100644 --- a/source/common/common/token_bucket_impl.h +++ b/source/common/common/token_bucket_impl.h @@ -115,6 +115,12 @@ class AtomicTokenBucketImpl { */ double remainingTokens() const; + /** + * Get the time to next token available. This is a snapshot and may change after the call. + * @return the time to next token available. + */ + std::chrono::milliseconds nextTokenAvailable() const; + private: double timeNowInSeconds() const; diff --git a/source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.h b/source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.h index cc3339e6580e2..10a6b7251a605 100644 --- a/source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.h +++ b/source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include "envoy/event/dispatcher.h" @@ -105,6 +106,7 @@ class TokenBucketContext { virtual uint64_t maxTokens() const PURE; virtual uint64_t remainingTokens() const PURE; + virtual uint64_t resetSeconds() const PURE; }; class RateLimitTokenBucket : public TokenBucketContext, @@ -122,6 +124,9 @@ class RateLimitTokenBucket : public TokenBucketContext, uint64_t remainingTokens() const override { return static_cast(token_bucket_.remainingTokens()); } + uint64_t resetSeconds() const override { + return static_cast(token_bucket_.nextTokenAvailable().count() / 1000); + } private: AtomicTokenBucketImpl token_bucket_; diff --git a/source/extensions/filters/http/local_ratelimit/local_ratelimit.cc b/source/extensions/filters/http/local_ratelimit/local_ratelimit.cc index 0068f9706f52b..a992659b3f82d 100644 --- a/source/extensions/filters/http/local_ratelimit/local_ratelimit.cc +++ b/source/extensions/filters/http/local_ratelimit/local_ratelimit.cc @@ -206,6 +206,11 @@ Http::FilterHeadersStatus Filter::encodeHeaders(Http::ResponseHeaderMap& headers headers.addReferenceKey( HttpFilters::Common::RateLimit::XRateLimitHeaders::get().XRateLimitRemaining, token_bucket_context_->remainingTokens()); + if (token_bucket_context_->remainingTokens() == 0) { + headers.addReferenceKey( + HttpFilters::Common::RateLimit::XRateLimitHeaders::get().XRateLimitReset, + token_bucket_context_->resetSeconds()); + } } return Http::FilterHeadersStatus::Continue; From dad3403a327fbfac7e48d81cc36ecf67ccd44d7d Mon Sep 17 00:00:00 2001 From: zirain Date: Fri, 16 May 2025 10:28:50 +0800 Subject: [PATCH 02/13] add test Signed-off-by: zirain --- .../local_ratelimit_integration_test.cc | 69 ++++++++++++++++++- 1 file changed, 68 insertions(+), 1 deletion(-) diff --git a/test/extensions/filters/http/local_ratelimit/local_ratelimit_integration_test.cc b/test/extensions/filters/http/local_ratelimit/local_ratelimit_integration_test.cc index 2a77679c8105a..7d67d5f72810f 100644 --- a/test/extensions/filters/http/local_ratelimit/local_ratelimit_integration_test.cc +++ b/test/extensions/filters/http/local_ratelimit/local_ratelimit_integration_test.cc @@ -1,4 +1,5 @@ #include "source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.h" +#include "source/extensions/filters/http/common/ratelimit_headers.h" #include "test/integration/http_protocol_integration.h" #include "test/test_common/test_runtime.h" @@ -203,6 +204,36 @@ name: envoy.filters.http.local_ratelimit local_rate_limit_per_downstream_connection: {} )EOF"; + +static constexpr absl::string_view limit_header_filter_config_ = + R"EOF( +name: envoy.filters.http.local_ratelimit +typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.local_ratelimit.v3.LocalRateLimit + stat_prefix: http_local_rate_limiter + enableXRatelimitHeaders: DRAFT_VERSION_03 + token_bucket: + max_tokens: 1 + tokens_per_fill: 1 + fill_interval: 1000s + filter_enabled: + runtime_key: local_rate_limit_enabled + default_value: + numerator: 100 + denominator: HUNDRED + filter_enforced: + runtime_key: local_rate_limit_enforced + default_value: + numerator: 100 + denominator: HUNDRED + response_headers_to_add: + - append_action: OVERWRITE_IF_EXISTS_OR_ADD + header: + key: x-local-rate-limit + value: 'true' + local_rate_limit_per_downstream_connection: {} +)EOF"; + static constexpr absl::string_view filter_config_with_blank_value_descriptor_ = R"EOF( name: envoy.filters.http.local_ratelimit @@ -489,6 +520,43 @@ TEST_P(LocalRateLimitFilterIntegrationTest, DenyRequestWithinSameConnection) { EXPECT_EQ(18, response->body().size()); } +TEST_P(LocalRateLimitFilterIntegrationTest, LimitHeaderTest) { + initializeFilter(fmt::format(limit_header_filter_config_, "true")); + + codec_client_ = makeHttpConnection(lookupPort("http")); + auto response = codec_client_->makeRequestWithBody(default_request_headers_, 0); + + waitForNextUpstreamRequest(); + upstream_request_->encodeHeaders(default_response_headers_, 1); + + ASSERT_TRUE(response->waitForEndStream()); + + EXPECT_TRUE(upstream_request_->complete()); + EXPECT_EQ(0U, upstream_request_->bodyLength()); + EXPECT_TRUE(response->complete()); + EXPECT_EQ("200", response->headers().getStatusValue()); + EXPECT_EQ(0, response->body().size()); + EXPECT_THAT( + response->headers(), + Http::HeaderValueOf( + Extensions::HttpFilters::Common::RateLimit::XRateLimitHeaders::get().XRateLimitLimit, "1")); + + response = codec_client_->makeRequestWithBody(default_request_headers_, 0); + + ASSERT_TRUE(response->waitForEndStream()); + EXPECT_TRUE(response->complete()); + EXPECT_EQ("429", response->headers().getStatusValue()); + EXPECT_EQ(18, response->body().size()); + EXPECT_THAT( + response->headers(), + Http::HeaderValueOf( + Extensions::HttpFilters::Common::RateLimit::XRateLimitHeaders::get().XRateLimitRemaining, "0")); + EXPECT_THAT( + response->headers(), + Http::HeaderValueOf( + Extensions::HttpFilters::Common::RateLimit::XRateLimitHeaders::get().XRateLimitReset, "1000")); +} + TEST_P(LocalRateLimitFilterIntegrationTest, PermitRequestAcrossDifferentConnections) { initializeFilter(fmt::format(filter_config_, "true")); @@ -562,7 +630,6 @@ TEST_P(LocalRateLimitFilterIntegrationTest, BasicTestPerRouteAndRds) { EXPECT_TRUE(response->complete()); EXPECT_EQ("200", response->headers().getStatusValue()); EXPECT_EQ(0, response->body().size()); - cleanupUpstreamAndDownstream(); cleanUpXdsConnection(); From 57a5bea2d22dd0ec69edada35b5053ba0a625896 Mon Sep 17 00:00:00 2001 From: zirain Date: Fri, 16 May 2025 10:32:02 +0800 Subject: [PATCH 03/13] change log Signed-off-by: zirain --- changelogs/current.yaml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/changelogs/current.yaml b/changelogs/current.yaml index 00215082a60c2..4c10819ab93a0 100644 --- a/changelogs/current.yaml +++ b/changelogs/current.yaml @@ -149,5 +149,8 @@ new_features: added :ref:`enable_ja4_fingerprinting ` to create a JA4 fingerprint hash from the Client Hello message. +- area: local_ratelimit + change: | + ``local_ratelimit`` will return ``x-ratelimit-reset`` header when the rate limit is exceeded. deprecated: From 75c20a134d6d35a459fb264ad014630917bc7e94 Mon Sep 17 00:00:00 2001 From: zirain Date: Fri, 16 May 2025 10:51:54 +0800 Subject: [PATCH 04/13] nit Signed-off-by: zirain --- .../filters/common/local_ratelimit/local_ratelimit_impl.h | 1 - 1 file changed, 1 deletion(-) diff --git a/source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.h b/source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.h index 10a6b7251a605..8df706a04b59c 100644 --- a/source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.h +++ b/source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.h @@ -1,7 +1,6 @@ #pragma once #include -#include #include #include "envoy/event/dispatcher.h" From 871c82e91f1391e4166ed9ee39eeda8dd03aa226 Mon Sep 17 00:00:00 2001 From: zirain Date: Fri, 16 May 2025 12:37:12 +0800 Subject: [PATCH 05/13] format Signed-off-by: zirain --- .../local_ratelimit_integration_test.cc | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/test/extensions/filters/http/local_ratelimit/local_ratelimit_integration_test.cc b/test/extensions/filters/http/local_ratelimit/local_ratelimit_integration_test.cc index 7d67d5f72810f..9c98be9c2bb09 100644 --- a/test/extensions/filters/http/local_ratelimit/local_ratelimit_integration_test.cc +++ b/test/extensions/filters/http/local_ratelimit/local_ratelimit_integration_test.cc @@ -204,8 +204,7 @@ name: envoy.filters.http.local_ratelimit local_rate_limit_per_downstream_connection: {} )EOF"; - -static constexpr absl::string_view limit_header_filter_config_ = + static constexpr absl::string_view limit_header_filter_config_ = R"EOF( name: envoy.filters.http.local_ratelimit typed_config: @@ -539,7 +538,8 @@ TEST_P(LocalRateLimitFilterIntegrationTest, LimitHeaderTest) { EXPECT_THAT( response->headers(), Http::HeaderValueOf( - Extensions::HttpFilters::Common::RateLimit::XRateLimitHeaders::get().XRateLimitLimit, "1")); + Extensions::HttpFilters::Common::RateLimit::XRateLimitHeaders::get().XRateLimitLimit, + "1")); response = codec_client_->makeRequestWithBody(default_request_headers_, 0); @@ -550,11 +550,13 @@ TEST_P(LocalRateLimitFilterIntegrationTest, LimitHeaderTest) { EXPECT_THAT( response->headers(), Http::HeaderValueOf( - Extensions::HttpFilters::Common::RateLimit::XRateLimitHeaders::get().XRateLimitRemaining, "0")); + Extensions::HttpFilters::Common::RateLimit::XRateLimitHeaders::get().XRateLimitRemaining, + "0")); EXPECT_THAT( response->headers(), Http::HeaderValueOf( - Extensions::HttpFilters::Common::RateLimit::XRateLimitHeaders::get().XRateLimitReset, "1000")); + Extensions::HttpFilters::Common::RateLimit::XRateLimitHeaders::get().XRateLimitReset, + "1000")); } TEST_P(LocalRateLimitFilterIntegrationTest, PermitRequestAcrossDifferentConnections) { From fc9fe5f9a56be8c2fe2f5c7a639a3a739ab68b12 Mon Sep 17 00:00:00 2001 From: zirain Date: Fri, 16 May 2025 21:02:52 +0800 Subject: [PATCH 06/13] update Signed-off-by: zirain --- source/common/common/token_bucket_impl.cc | 15 ++++++++++++--- source/common/common/token_bucket_impl.h | 9 ++++++--- .../local_ratelimit/local_ratelimit_impl.cc | 2 +- .../common/local_ratelimit/local_ratelimit_impl.h | 2 +- .../http/local_ratelimit/local_ratelimit.cc | 8 +++----- .../http/rate_limit_quota/global_client_impl.cc | 6 +++--- .../filters/http/rate_limit_quota/filter_test.cc | 4 ++-- 7 files changed, 28 insertions(+), 18 deletions(-) diff --git a/source/common/common/token_bucket_impl.cc b/source/common/common/token_bucket_impl.cc index 67953e5f41e33..d2158dcff9cb4 100644 --- a/source/common/common/token_bucket_impl.cc +++ b/source/common/common/token_bucket_impl.cc @@ -8,6 +8,7 @@ namespace Envoy { namespace { // The minimal fill rate will be one second every year. constexpr double kMinFillRate = 1.0 / (365 * 24 * 60 * 60); +constexpr auto MILLISECONDS_PER_SECOND = 1000; } // namespace @@ -59,13 +60,16 @@ void TokenBucketImpl::maybeReset(uint64_t num_tokens) { } AtomicTokenBucketImpl::AtomicTokenBucketImpl(uint64_t max_tokens, TimeSource& time_source, + std::chrono::milliseconds fill_interval, double fill_rate, bool init_fill) - : AtomicTokenBucketImpl::AtomicTokenBucketImpl(max_tokens, time_source, fill_rate, - (init_fill) ? max_tokens : 0) {} + : AtomicTokenBucketImpl::AtomicTokenBucketImpl(max_tokens, time_source, fill_interval, + fill_rate, (init_fill) ? max_tokens : 0) {} AtomicTokenBucketImpl::AtomicTokenBucketImpl(uint64_t max_tokens, TimeSource& time_source, + std::chrono::milliseconds fill_interval, double fill_rate, uint64_t initial_tokens) : max_tokens_(max_tokens), fill_rate_(std::max(std::abs(fill_rate), kMinFillRate)), + fill_interval_(std::chrono::duration(fill_interval).count()), time_source_(time_source) { auto time_in_seconds = timeNowInSeconds(); if (initial_tokens) { @@ -109,7 +113,12 @@ std::chrono::milliseconds AtomicTokenBucketImpl::nextTokenAvailable() const { return std::chrono::milliseconds(0); } - return std::chrono::milliseconds(static_cast(std::ceil((1 / fill_rate_) * 1000))); + // Calculate time since the last fill + double current_time = timeNowInSeconds(); + double time_since_last_fill = std::fmod(current_time, fill_interval_); + double time_until_next_fill = fill_interval_ - time_since_last_fill; + return std::chrono::milliseconds( + static_cast(time_until_next_fill * MILLISECONDS_PER_SECOND)); } } // namespace Envoy diff --git a/source/common/common/token_bucket_impl.h b/source/common/common/token_bucket_impl.h index bb52d93e5afbf..a588c13162f76 100644 --- a/source/common/common/token_bucket_impl.h +++ b/source/common/common/token_bucket_impl.h @@ -48,8 +48,10 @@ class AtomicTokenBucketImpl { * @param init_fill supplies whether the bucket should be initialized with max_tokens. */ explicit AtomicTokenBucketImpl(uint64_t max_tokens, TimeSource& time_source, - double fill_rate = 1.0, bool init_fill = true); - explicit AtomicTokenBucketImpl(uint64_t max_tokens, TimeSource& time_source, double fill_rate, + std::chrono::milliseconds fill_interval, double fill_rate = 1.0, + bool init_fill = true); + explicit AtomicTokenBucketImpl(uint64_t max_tokens, TimeSource& time_source, + std::chrono::milliseconds fill_interval, double fill_rate, uint64_t initial_tokens); // This reference https://github.com/facebook/folly/blob/main/folly/TokenBucket.h. @@ -126,8 +128,9 @@ class AtomicTokenBucketImpl { const double max_tokens_; const double fill_rate_; + const double fill_interval_; - std::atomic time_in_seconds_{}; + std::atomic time_in_seconds_; TimeSource& time_source_; }; diff --git a/source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.cc b/source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.cc index 1ff6b295b0d52..4971c1a86a584 100644 --- a/source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.cc +++ b/source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.cc @@ -79,7 +79,7 @@ ShareProviderManagerSharedPtr ShareProviderManager::singleton(Event::Dispatcher& RateLimitTokenBucket::RateLimitTokenBucket(uint64_t max_tokens, uint64_t tokens_per_fill, std::chrono::milliseconds fill_interval, TimeSource& time_source) - : token_bucket_(max_tokens, time_source, + : token_bucket_(max_tokens, time_source, fill_interval, // Calculate the fill rate in tokens per second. tokens_per_fill / std::chrono::duration(fill_interval).count()), fill_interval_(fill_interval) {} diff --git a/source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.h b/source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.h index 8df706a04b59c..05adf8a558fc6 100644 --- a/source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.h +++ b/source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.h @@ -124,7 +124,7 @@ class RateLimitTokenBucket : public TokenBucketContext, return static_cast(token_bucket_.remainingTokens()); } uint64_t resetSeconds() const override { - return static_cast(token_bucket_.nextTokenAvailable().count() / 1000); + return static_cast(std::ceil(token_bucket_.nextTokenAvailable().count() / 1000)); } private: diff --git a/source/extensions/filters/http/local_ratelimit/local_ratelimit.cc b/source/extensions/filters/http/local_ratelimit/local_ratelimit.cc index a992659b3f82d..36fc416ae8b91 100644 --- a/source/extensions/filters/http/local_ratelimit/local_ratelimit.cc +++ b/source/extensions/filters/http/local_ratelimit/local_ratelimit.cc @@ -206,11 +206,9 @@ Http::FilterHeadersStatus Filter::encodeHeaders(Http::ResponseHeaderMap& headers headers.addReferenceKey( HttpFilters::Common::RateLimit::XRateLimitHeaders::get().XRateLimitRemaining, token_bucket_context_->remainingTokens()); - if (token_bucket_context_->remainingTokens() == 0) { - headers.addReferenceKey( - HttpFilters::Common::RateLimit::XRateLimitHeaders::get().XRateLimitReset, - token_bucket_context_->resetSeconds()); - } + headers.addReferenceKey( + HttpFilters::Common::RateLimit::XRateLimitHeaders::get().XRateLimitReset, + token_bucket_context_->resetSeconds()); } return Http::FilterHeadersStatus::Continue; diff --git a/source/extensions/filters/http/rate_limit_quota/global_client_impl.cc b/source/extensions/filters/http/rate_limit_quota/global_client_impl.cc index 426f4546790fd..ef41067139231 100644 --- a/source/extensions/filters/http/rate_limit_quota/global_client_impl.cc +++ b/source/extensions/filters/http/rate_limit_quota/global_client_impl.cc @@ -251,9 +251,9 @@ createTokenBucketFromAction(const RateLimitStrategy& strategy, TimeSource& time_ ? max_tokens * (existing_token_bucket->remainingTokens() / existing_token_bucket->maxTokens()) : max_tokens; - - return std::make_shared(max_tokens, time_source, fill_rate_per_sec, - initial_tokens); + return std::make_shared( + max_tokens, time_source, std::chrono::milliseconds(fill_interval_sec * 1000), + fill_rate_per_sec, initial_tokens); } void GlobalRateLimitClientImpl::onReceiveMessage(RateLimitQuotaResponsePtr&& response) { diff --git a/test/extensions/filters/http/rate_limit_quota/filter_test.cc b/test/extensions/filters/http/rate_limit_quota/filter_test.cc index 96cdd58b18195..77e1a6d8b0bed 100644 --- a/test/extensions/filters/http/rate_limit_quota/filter_test.cc +++ b/test/extensions/filters/http/rate_limit_quota/filter_test.cc @@ -598,7 +598,7 @@ TEST_F(FilterTest, DecodeHeaderWithTokenBucketAllow) { token_bucket->mutable_fill_interval()->set_seconds(60); // 100 available tokens so the test doesn't get throttled. std::shared_ptr token_bucket_limiter = - std::make_shared(100, dispatcher_.timeSource(), 100 / 60); + std::make_shared(100, dispatcher_.timeSource(), 60 * 1000, 100 / 60); RateLimitQuotaResponse::BucketAction no_assignment_action; no_assignment_action.mutable_quota_assignment_action() @@ -643,7 +643,7 @@ TEST_F(FilterTest, DecodeHeaderWithTokenBucketDeny) { token_bucket->mutable_tokens_per_fill()->set_value(1); token_bucket->mutable_fill_interval()->set_seconds(60); std::shared_ptr token_bucket_limiter = - std::make_shared(1, dispatcher_.timeSource(), 1 / 60); + std::make_shared(1, dispatcher_.timeSource(), 60 * 1000, 1 / 60); // All subsequent requests should deny for 60 (mock) seconds. EXPECT_TRUE(token_bucket_limiter->consume()); From 328d697bb0d03920c295a100112c83e9358a4741 Mon Sep 17 00:00:00 2001 From: zirain Date: Fri, 16 May 2025 22:32:30 +0800 Subject: [PATCH 07/13] update test Signed-off-by: zirain --- .../local_ratelimit_integration_test.cc | 99 ++++++++++++------- 1 file changed, 63 insertions(+), 36 deletions(-) diff --git a/test/extensions/filters/http/local_ratelimit/local_ratelimit_integration_test.cc b/test/extensions/filters/http/local_ratelimit/local_ratelimit_integration_test.cc index 9c98be9c2bb09..5dae484fc4023 100644 --- a/test/extensions/filters/http/local_ratelimit/local_ratelimit_integration_test.cc +++ b/test/extensions/filters/http/local_ratelimit/local_ratelimit_integration_test.cc @@ -1,3 +1,5 @@ +#include + #include "source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.h" #include "source/extensions/filters/http/common/ratelimit_headers.h" @@ -160,6 +162,29 @@ class LocalRateLimitFilterIntegrationTest : public Event::TestUsingSimulatedTime EXPECT_EQ(expected_status, response->headers().getStatusValue()); EXPECT_EQ(expected_body_size, response->body().size()); } + void verifyResponse(IntegrationStreamDecoderPtr response, const std::string& expected_status, + size_t expected_body_size, const std::string& expected_limit, + const std::string& expected_remaining, const std::string& expected_reset) { + ASSERT_TRUE(response->waitForEndStream()); + EXPECT_TRUE(response->complete()); + EXPECT_EQ(expected_status, response->headers().getStatusValue()); + EXPECT_EQ(expected_body_size, response->body().size()); + EXPECT_THAT( + response->headers(), + Http::HeaderValueOf( + Extensions::HttpFilters::Common::RateLimit::XRateLimitHeaders::get().XRateLimitLimit, + expected_limit)); + EXPECT_THAT( + response->headers(), + Http::HeaderValueOf(Extensions::HttpFilters::Common::RateLimit::XRateLimitHeaders::get() + .XRateLimitRemaining, + expected_remaining)); + EXPECT_THAT( + response->headers(), + Http::HeaderValueOf( + Extensions::HttpFilters::Common::RateLimit::XRateLimitHeaders::get().XRateLimitReset, + expected_reset)); + } void sendAndVerifyRequest(const std::string& cluster, const std::string& expected_status, size_t expected_body_size) { @@ -170,11 +195,29 @@ class LocalRateLimitFilterIntegrationTest : public Event::TestUsingSimulatedTime EXPECT_TRUE(upstream_request_->complete()); EXPECT_EQ(0U, upstream_request_->bodyLength()); } + void sendAndVerifyRequest(const std::string& expected_limit, + const std::string& expected_remaining, + const std::string& expected_reset) { + auto response = codec_client_->makeRequestWithBody(default_request_headers_, 0); + waitForNextUpstreamRequest(); + upstream_request_->encodeHeaders(default_response_headers_, 1); + verifyResponse(std::move(response), "200", 0, expected_limit, expected_remaining, + expected_reset); + EXPECT_TRUE(upstream_request_->complete()); + EXPECT_EQ(0U, upstream_request_->bodyLength()); + } void sendRateLimitedRequest(const std::string& cluster) { auto response = makeRequest(cluster); verifyResponse(std::move(response), "429", 18); // 18 is the expected body size for rate-limited responses. } + void sendRateLimitedRequest(const std::string& expected_limit, + const std::string& expected_remaining, + const std::string& expected_reset) { + auto response = codec_client_->makeRequestWithBody(default_request_headers_, 0); + verifyResponse(std::move(response), "429", 18, expected_limit, expected_remaining, + expected_reset); + } static constexpr absl::string_view filter_config_ = R"EOF( @@ -212,9 +255,9 @@ name: envoy.filters.http.local_ratelimit stat_prefix: http_local_rate_limiter enableXRatelimitHeaders: DRAFT_VERSION_03 token_bucket: - max_tokens: 1 - tokens_per_fill: 1 - fill_interval: 1000s + max_tokens: 2 + tokens_per_fill: 2 + fill_interval: 4s filter_enabled: runtime_key: local_rate_limit_enabled default_value: @@ -519,44 +562,28 @@ TEST_P(LocalRateLimitFilterIntegrationTest, DenyRequestWithinSameConnection) { EXPECT_EQ(18, response->body().size()); } -TEST_P(LocalRateLimitFilterIntegrationTest, LimitHeaderTest) { - initializeFilter(fmt::format(limit_header_filter_config_, "true")); +TEST_P(LocalRateLimitFilterIntegrationTest, HeaderTest) { + initializeFilter(fmt::format(limit_header_filter_config_, "false")); + // The first request should be allowed and the response should contain codec_client_ = makeHttpConnection(lookupPort("http")); - auto response = codec_client_->makeRequestWithBody(default_request_headers_, 0); - - waitForNextUpstreamRequest(); - upstream_request_->encodeHeaders(default_response_headers_, 1); - - ASSERT_TRUE(response->waitForEndStream()); + sendAndVerifyRequest("2", "1", "0"); + cleanupUpstreamAndDownstream(); - EXPECT_TRUE(upstream_request_->complete()); - EXPECT_EQ(0U, upstream_request_->bodyLength()); - EXPECT_TRUE(response->complete()); - EXPECT_EQ("200", response->headers().getStatusValue()); - EXPECT_EQ(0, response->body().size()); - EXPECT_THAT( - response->headers(), - Http::HeaderValueOf( - Extensions::HttpFilters::Common::RateLimit::XRateLimitHeaders::get().XRateLimitLimit, - "1")); + // Max tokens is 2, the second request should be allowed. + codec_client_ = makeHttpConnection(lookupPort("http")); + sendAndVerifyRequest("2", "0", "4"); + cleanupUpstreamAndDownstream(); - response = codec_client_->makeRequestWithBody(default_request_headers_, 0); + // The third request should be rate limited, x-ratelimit-reset should be 4s. + codec_client_ = makeHttpConnection(lookupPort("http")); + sendRateLimitedRequest("2", "0", "4"); + cleanupUpstreamAndDownstream(); - ASSERT_TRUE(response->waitForEndStream()); - EXPECT_TRUE(response->complete()); - EXPECT_EQ("429", response->headers().getStatusValue()); - EXPECT_EQ(18, response->body().size()); - EXPECT_THAT( - response->headers(), - Http::HeaderValueOf( - Extensions::HttpFilters::Common::RateLimit::XRateLimitHeaders::get().XRateLimitRemaining, - "0")); - EXPECT_THAT( - response->headers(), - Http::HeaderValueOf( - Extensions::HttpFilters::Common::RateLimit::XRateLimitHeaders::get().XRateLimitReset, - "1000")); + // After 1s, the forth request should be rate limited, x-ratelimit-reset should be 3s. + simTime().advanceTimeWait(std::chrono::seconds(1)); + codec_client_ = makeHttpConnection(lookupPort("http")); + sendRateLimitedRequest("2", "0", "3"); } TEST_P(LocalRateLimitFilterIntegrationTest, PermitRequestAcrossDifferentConnections) { From 9c6fb4041fe9144b3a6235fd342a50f010340cfe Mon Sep 17 00:00:00 2001 From: zirain Date: Sat, 17 May 2025 07:41:41 +0800 Subject: [PATCH 08/13] fix token_bucket_impl_test Signed-off-by: zirain --- test/common/common/token_bucket_impl_test.cc | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/test/common/common/token_bucket_impl_test.cc b/test/common/common/token_bucket_impl_test.cc index ce20de2cfb28d..ed67529019954 100644 --- a/test/common/common/token_bucket_impl_test.cc +++ b/test/common/common/token_bucket_impl_test.cc @@ -134,20 +134,22 @@ class AtomicTokenBucketImplTest : public testing::Test { // Verifies TokenBucket initialization. TEST_F(AtomicTokenBucketImplTest, Initialization) { - AtomicTokenBucketImpl token_bucket{1, time_system_, -1.0}; + AtomicTokenBucketImpl token_bucket{1, time_system_,std::chrono::seconds(1), -1.0}; EXPECT_EQ(1, token_bucket.fillRate()); EXPECT_EQ(1, token_bucket.maxTokens()); EXPECT_EQ(1, token_bucket.remainingTokens()); + EXPECT_EQ(token_bucket.nextTokenAvailable(), std::chrono::milliseconds(0)); EXPECT_EQ(1, token_bucket.consume(1, false)); EXPECT_EQ(0, token_bucket.consume(1, false)); EXPECT_EQ(false, token_bucket.consume()); + EXPECT_EQ(token_bucket.nextTokenAvailable(), std::chrono::seconds(1)); } // Verifies TokenBucket's maximum capacity. TEST_F(AtomicTokenBucketImplTest, MaxBucketSize) { - AtomicTokenBucketImpl token_bucket{3, time_system_, 1}; + AtomicTokenBucketImpl token_bucket{3, time_system_, std::chrono::seconds(1), 1}; EXPECT_EQ(1, token_bucket.fillRate()); EXPECT_EQ(3, token_bucket.maxTokens()); @@ -161,7 +163,7 @@ TEST_F(AtomicTokenBucketImplTest, MaxBucketSize) { // Verifies that TokenBucket can consume tokens. TEST_F(AtomicTokenBucketImplTest, Consume) { - AtomicTokenBucketImpl token_bucket{10, time_system_, 1}; + AtomicTokenBucketImpl token_bucket{10, time_system_,std::chrono::seconds(1),1}; EXPECT_EQ(0, token_bucket.consume(20, false)); EXPECT_EQ(9, token_bucket.consume(9, false)); @@ -182,7 +184,7 @@ TEST_F(AtomicTokenBucketImplTest, Consume) { // Verifies that TokenBucket can refill tokens. TEST_F(AtomicTokenBucketImplTest, Refill) { - AtomicTokenBucketImpl token_bucket{1, time_system_, 0.5}; + AtomicTokenBucketImpl token_bucket{1, time_system_,std::chrono::seconds(1), 0.5}; EXPECT_EQ(1, token_bucket.consume(1, false)); time_system_.setMonotonicTime(std::chrono::milliseconds(500)); @@ -195,7 +197,7 @@ TEST_F(AtomicTokenBucketImplTest, Refill) { // Test partial consumption of tokens. TEST_F(AtomicTokenBucketImplTest, PartialConsumption) { - AtomicTokenBucketImpl token_bucket{16, time_system_, 16}; + AtomicTokenBucketImpl token_bucket{16, time_system_, std::chrono::seconds(1), 16}; EXPECT_EQ(16, token_bucket.consume(18, true)); time_system_.advanceTimeWait(std::chrono::milliseconds(62)); EXPECT_EQ(0, token_bucket.consume(1, true)); @@ -207,7 +209,7 @@ TEST_F(AtomicTokenBucketImplTest, PartialConsumption) { TEST_F(AtomicTokenBucketImplTest, YearlyMinRefillRate) { constexpr uint64_t seconds_per_year = 365 * 24 * 60 * 60; // Set the fill rate to be 2 years. - AtomicTokenBucketImpl token_bucket{1, time_system_, 1.0 / (seconds_per_year * 2)}; + AtomicTokenBucketImpl token_bucket{1, time_system_,std::chrono::seconds(seconds_per_year), 1.0 / (seconds_per_year * 2)}; // Consume first token. EXPECT_EQ(1, token_bucket.consume(1, false)); @@ -220,7 +222,7 @@ TEST_F(AtomicTokenBucketImplTest, YearlyMinRefillRate) { } TEST_F(AtomicTokenBucketImplTest, ConsumeNegativeTokens) { - AtomicTokenBucketImpl token_bucket{10, time_system_, 1}; + AtomicTokenBucketImpl token_bucket{10, time_system_,std::chrono::seconds(1), 1}; EXPECT_EQ(3, token_bucket.consume([](double) { return 3; })); EXPECT_EQ(7, token_bucket.remainingTokens()); @@ -229,7 +231,7 @@ TEST_F(AtomicTokenBucketImplTest, ConsumeNegativeTokens) { } TEST_F(AtomicTokenBucketImplTest, ConsumeSuperLargeTokens) { - AtomicTokenBucketImpl token_bucket{10, time_system_, 1}; + AtomicTokenBucketImpl token_bucket{10, time_system_, std::chrono::seconds(1),1}; EXPECT_EQ(100, token_bucket.consume([](double) { return 100; })); EXPECT_EQ(-90, token_bucket.remainingTokens()); @@ -239,7 +241,7 @@ TEST_F(AtomicTokenBucketImplTest, MultipleThreadsConsume) { // Real time source to ensure we will not fall into endless loop. Event::TestRealTimeSystem real_time_source; - AtomicTokenBucketImpl token_bucket{1200, time_system_, 1.0}; + AtomicTokenBucketImpl token_bucket{1200, time_system_, std::chrono::seconds(1),1.0}; // Exhaust all tokens. EXPECT_EQ(1200, token_bucket.consume(1200, false)); From 7f329172064c6904d7117bbf3d9b5b77cd7ed3dc Mon Sep 17 00:00:00 2001 From: zirain Date: Sat, 17 May 2025 08:16:03 +0800 Subject: [PATCH 09/13] more fix Signed-off-by: zirain --- test/common/common/token_bucket_impl_test.cc | 15 ++++++++------- .../filters/http/rate_limit_quota/filter_test.cc | 6 ++++-- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/test/common/common/token_bucket_impl_test.cc b/test/common/common/token_bucket_impl_test.cc index ed67529019954..a1ba30098b0c3 100644 --- a/test/common/common/token_bucket_impl_test.cc +++ b/test/common/common/token_bucket_impl_test.cc @@ -134,7 +134,7 @@ class AtomicTokenBucketImplTest : public testing::Test { // Verifies TokenBucket initialization. TEST_F(AtomicTokenBucketImplTest, Initialization) { - AtomicTokenBucketImpl token_bucket{1, time_system_,std::chrono::seconds(1), -1.0}; + AtomicTokenBucketImpl token_bucket{1, time_system_, std::chrono::seconds(1), -1.0}; EXPECT_EQ(1, token_bucket.fillRate()); EXPECT_EQ(1, token_bucket.maxTokens()); @@ -163,7 +163,7 @@ TEST_F(AtomicTokenBucketImplTest, MaxBucketSize) { // Verifies that TokenBucket can consume tokens. TEST_F(AtomicTokenBucketImplTest, Consume) { - AtomicTokenBucketImpl token_bucket{10, time_system_,std::chrono::seconds(1),1}; + AtomicTokenBucketImpl token_bucket{10, time_system_, std::chrono::seconds(1), 1}; EXPECT_EQ(0, token_bucket.consume(20, false)); EXPECT_EQ(9, token_bucket.consume(9, false)); @@ -184,7 +184,7 @@ TEST_F(AtomicTokenBucketImplTest, Consume) { // Verifies that TokenBucket can refill tokens. TEST_F(AtomicTokenBucketImplTest, Refill) { - AtomicTokenBucketImpl token_bucket{1, time_system_,std::chrono::seconds(1), 0.5}; + AtomicTokenBucketImpl token_bucket{1, time_system_, std::chrono::seconds(1), 0.5}; EXPECT_EQ(1, token_bucket.consume(1, false)); time_system_.setMonotonicTime(std::chrono::milliseconds(500)); @@ -209,7 +209,8 @@ TEST_F(AtomicTokenBucketImplTest, PartialConsumption) { TEST_F(AtomicTokenBucketImplTest, YearlyMinRefillRate) { constexpr uint64_t seconds_per_year = 365 * 24 * 60 * 60; // Set the fill rate to be 2 years. - AtomicTokenBucketImpl token_bucket{1, time_system_,std::chrono::seconds(seconds_per_year), 1.0 / (seconds_per_year * 2)}; + AtomicTokenBucketImpl token_bucket{1, time_system_, std::chrono::seconds(seconds_per_year), + 1.0 / (seconds_per_year * 2)}; // Consume first token. EXPECT_EQ(1, token_bucket.consume(1, false)); @@ -222,7 +223,7 @@ TEST_F(AtomicTokenBucketImplTest, YearlyMinRefillRate) { } TEST_F(AtomicTokenBucketImplTest, ConsumeNegativeTokens) { - AtomicTokenBucketImpl token_bucket{10, time_system_,std::chrono::seconds(1), 1}; + AtomicTokenBucketImpl token_bucket{10, time_system_, std::chrono::seconds(1), 1}; EXPECT_EQ(3, token_bucket.consume([](double) { return 3; })); EXPECT_EQ(7, token_bucket.remainingTokens()); @@ -231,7 +232,7 @@ TEST_F(AtomicTokenBucketImplTest, ConsumeNegativeTokens) { } TEST_F(AtomicTokenBucketImplTest, ConsumeSuperLargeTokens) { - AtomicTokenBucketImpl token_bucket{10, time_system_, std::chrono::seconds(1),1}; + AtomicTokenBucketImpl token_bucket{10, time_system_, std::chrono::seconds(1), 1}; EXPECT_EQ(100, token_bucket.consume([](double) { return 100; })); EXPECT_EQ(-90, token_bucket.remainingTokens()); @@ -241,7 +242,7 @@ TEST_F(AtomicTokenBucketImplTest, MultipleThreadsConsume) { // Real time source to ensure we will not fall into endless loop. Event::TestRealTimeSystem real_time_source; - AtomicTokenBucketImpl token_bucket{1200, time_system_, std::chrono::seconds(1),1.0}; + AtomicTokenBucketImpl token_bucket{1200, time_system_, std::chrono::seconds(1), 1.0}; // Exhaust all tokens. EXPECT_EQ(1200, token_bucket.consume(1200, false)); diff --git a/test/extensions/filters/http/rate_limit_quota/filter_test.cc b/test/extensions/filters/http/rate_limit_quota/filter_test.cc index 77e1a6d8b0bed..fa401f0a1b5b6 100644 --- a/test/extensions/filters/http/rate_limit_quota/filter_test.cc +++ b/test/extensions/filters/http/rate_limit_quota/filter_test.cc @@ -598,7 +598,8 @@ TEST_F(FilterTest, DecodeHeaderWithTokenBucketAllow) { token_bucket->mutable_fill_interval()->set_seconds(60); // 100 available tokens so the test doesn't get throttled. std::shared_ptr token_bucket_limiter = - std::make_shared(100, dispatcher_.timeSource(), 60 * 1000, 100 / 60); + std::make_shared(100, dispatcher_.timeSource(), + std::chrono::seconds(60), 100 / 60); RateLimitQuotaResponse::BucketAction no_assignment_action; no_assignment_action.mutable_quota_assignment_action() @@ -643,7 +644,8 @@ TEST_F(FilterTest, DecodeHeaderWithTokenBucketDeny) { token_bucket->mutable_tokens_per_fill()->set_value(1); token_bucket->mutable_fill_interval()->set_seconds(60); std::shared_ptr token_bucket_limiter = - std::make_shared(1, dispatcher_.timeSource(), 60 * 1000, 1 / 60); + std::make_shared(1, dispatcher_.timeSource(), std::chrono::seconds(60), + 1 / 60); // All subsequent requests should deny for 60 (mock) seconds. EXPECT_TRUE(token_bucket_limiter->consume()); From 23ca425b2439c7c5c04fc95d3d7157646509421c Mon Sep 17 00:00:00 2001 From: zirain Date: Sat, 17 May 2025 14:32:55 +0800 Subject: [PATCH 10/13] fix Signed-off-by: zirain --- .../http/local_ratelimit/v3/local_rate_limit.proto | 14 ++++++++++++++ source/common/common/token_bucket_impl.cc | 2 +- .../local_ratelimit_integration_test.cc | 2 +- 3 files changed, 16 insertions(+), 2 deletions(-) diff --git a/api/envoy/extensions/filters/http/local_ratelimit/v3/local_rate_limit.proto b/api/envoy/extensions/filters/http/local_ratelimit/v3/local_rate_limit.proto index b0199c04b7263..8306adda2919d 100644 --- a/api/envoy/extensions/filters/http/local_ratelimit/v3/local_rate_limit.proto +++ b/api/envoy/extensions/filters/http/local_ratelimit/v3/local_rate_limit.proto @@ -130,6 +130,20 @@ message LocalRateLimit { // Defines the standard version to use for X-RateLimit headers emitted by the filter. // + // * ``X-RateLimit-Limit`` - indicates the request-quota associated to the + // client in the current time-window followed by the description of the + // quota policy. + // * ``X-RateLimit-Remaining`` - indicates the remaining requests in the + // current time-window. + // * ``X-RateLimit-Reset`` - indicates the number of seconds until reset of + // the current time-window. + // + // In case rate limiting policy specifies more then one time window, the values + // above represent the window that is closest to reaching its limit. + // + // For more information about the headers specification see selected version of + // the `draft RFC `_. + // // Disabled by default. common.ratelimit.v3.XRateLimitHeadersRFCVersion enable_x_ratelimit_headers = 12 [(validate.rules).enum = {defined_only: true}]; diff --git a/source/common/common/token_bucket_impl.cc b/source/common/common/token_bucket_impl.cc index d2158dcff9cb4..d644681c76c7d 100644 --- a/source/common/common/token_bucket_impl.cc +++ b/source/common/common/token_bucket_impl.cc @@ -113,7 +113,7 @@ std::chrono::milliseconds AtomicTokenBucketImpl::nextTokenAvailable() const { return std::chrono::milliseconds(0); } - // Calculate time since the last fill + // Calculate time since the last fill. double current_time = timeNowInSeconds(); double time_since_last_fill = std::fmod(current_time, fill_interval_); double time_until_next_fill = fill_interval_ - time_since_last_fill; diff --git a/test/extensions/filters/http/local_ratelimit/local_ratelimit_integration_test.cc b/test/extensions/filters/http/local_ratelimit/local_ratelimit_integration_test.cc index 5dae484fc4023..ca793507dbdca 100644 --- a/test/extensions/filters/http/local_ratelimit/local_ratelimit_integration_test.cc +++ b/test/extensions/filters/http/local_ratelimit/local_ratelimit_integration_test.cc @@ -565,7 +565,7 @@ TEST_P(LocalRateLimitFilterIntegrationTest, DenyRequestWithinSameConnection) { TEST_P(LocalRateLimitFilterIntegrationTest, HeaderTest) { initializeFilter(fmt::format(limit_header_filter_config_, "false")); - // The first request should be allowed and the response should contain + // The first request should be allowed. codec_client_ = makeHttpConnection(lookupPort("http")); sendAndVerifyRequest("2", "1", "0"); cleanupUpstreamAndDownstream(); From 4993162b88529cdb1a85daecd63cc67be62d8b91 Mon Sep 17 00:00:00 2001 From: zirain Date: Tue, 20 May 2025 17:51:39 +0800 Subject: [PATCH 11/13] address review comment Signed-off-by: zirain --- source/common/common/token_bucket_impl.cc | 12 ++------ source/common/common/token_bucket_impl.h | 7 ++--- .../local_ratelimit/local_ratelimit_impl.cc | 2 +- .../rate_limit_quota/global_client_impl.cc | 5 ++-- test/common/common/token_bucket_impl_test.cc | 30 +++++++++++-------- .../local_ratelimit_integration_test.cc | 10 +++---- .../http/rate_limit_quota/filter_test.cc | 6 ++-- 7 files changed, 33 insertions(+), 39 deletions(-) diff --git a/source/common/common/token_bucket_impl.cc b/source/common/common/token_bucket_impl.cc index d644681c76c7d..faec1dd82dfeb 100644 --- a/source/common/common/token_bucket_impl.cc +++ b/source/common/common/token_bucket_impl.cc @@ -8,7 +8,6 @@ namespace Envoy { namespace { // The minimal fill rate will be one second every year. constexpr double kMinFillRate = 1.0 / (365 * 24 * 60 * 60); -constexpr auto MILLISECONDS_PER_SECOND = 1000; } // namespace @@ -60,16 +59,13 @@ void TokenBucketImpl::maybeReset(uint64_t num_tokens) { } AtomicTokenBucketImpl::AtomicTokenBucketImpl(uint64_t max_tokens, TimeSource& time_source, - std::chrono::milliseconds fill_interval, double fill_rate, bool init_fill) - : AtomicTokenBucketImpl::AtomicTokenBucketImpl(max_tokens, time_source, fill_interval, - fill_rate, (init_fill) ? max_tokens : 0) {} + : AtomicTokenBucketImpl::AtomicTokenBucketImpl(max_tokens, time_source, fill_rate, + (init_fill) ? max_tokens : 0) {} AtomicTokenBucketImpl::AtomicTokenBucketImpl(uint64_t max_tokens, TimeSource& time_source, - std::chrono::milliseconds fill_interval, double fill_rate, uint64_t initial_tokens) : max_tokens_(max_tokens), fill_rate_(std::max(std::abs(fill_rate), kMinFillRate)), - fill_interval_(std::chrono::duration(fill_interval).count()), time_source_(time_source) { auto time_in_seconds = timeNowInSeconds(); if (initial_tokens) { @@ -115,10 +111,8 @@ std::chrono::milliseconds AtomicTokenBucketImpl::nextTokenAvailable() const { // Calculate time since the last fill. double current_time = timeNowInSeconds(); - double time_since_last_fill = std::fmod(current_time, fill_interval_); - double time_until_next_fill = fill_interval_ - time_since_last_fill; return std::chrono::milliseconds( - static_cast(time_until_next_fill * MILLISECONDS_PER_SECOND)); + static_cast(1 / fill_rate_ * 1000 - (current_time - time_in_seconds_.load()))); } } // namespace Envoy diff --git a/source/common/common/token_bucket_impl.h b/source/common/common/token_bucket_impl.h index a588c13162f76..3050c3a4f8d2b 100644 --- a/source/common/common/token_bucket_impl.h +++ b/source/common/common/token_bucket_impl.h @@ -48,10 +48,8 @@ class AtomicTokenBucketImpl { * @param init_fill supplies whether the bucket should be initialized with max_tokens. */ explicit AtomicTokenBucketImpl(uint64_t max_tokens, TimeSource& time_source, - std::chrono::milliseconds fill_interval, double fill_rate = 1.0, - bool init_fill = true); - explicit AtomicTokenBucketImpl(uint64_t max_tokens, TimeSource& time_source, - std::chrono::milliseconds fill_interval, double fill_rate, + double fill_rate = 1.0, bool init_fill = true); + explicit AtomicTokenBucketImpl(uint64_t max_tokens, TimeSource& time_source, double fill_rate, uint64_t initial_tokens); // This reference https://github.com/facebook/folly/blob/main/folly/TokenBucket.h. @@ -128,7 +126,6 @@ class AtomicTokenBucketImpl { const double max_tokens_; const double fill_rate_; - const double fill_interval_; std::atomic time_in_seconds_; TimeSource& time_source_; diff --git a/source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.cc b/source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.cc index 4971c1a86a584..1ff6b295b0d52 100644 --- a/source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.cc +++ b/source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.cc @@ -79,7 +79,7 @@ ShareProviderManagerSharedPtr ShareProviderManager::singleton(Event::Dispatcher& RateLimitTokenBucket::RateLimitTokenBucket(uint64_t max_tokens, uint64_t tokens_per_fill, std::chrono::milliseconds fill_interval, TimeSource& time_source) - : token_bucket_(max_tokens, time_source, fill_interval, + : token_bucket_(max_tokens, time_source, // Calculate the fill rate in tokens per second. tokens_per_fill / std::chrono::duration(fill_interval).count()), fill_interval_(fill_interval) {} diff --git a/source/extensions/filters/http/rate_limit_quota/global_client_impl.cc b/source/extensions/filters/http/rate_limit_quota/global_client_impl.cc index ef41067139231..f666533d301bf 100644 --- a/source/extensions/filters/http/rate_limit_quota/global_client_impl.cc +++ b/source/extensions/filters/http/rate_limit_quota/global_client_impl.cc @@ -251,9 +251,8 @@ createTokenBucketFromAction(const RateLimitStrategy& strategy, TimeSource& time_ ? max_tokens * (existing_token_bucket->remainingTokens() / existing_token_bucket->maxTokens()) : max_tokens; - return std::make_shared( - max_tokens, time_source, std::chrono::milliseconds(fill_interval_sec * 1000), - fill_rate_per_sec, initial_tokens); + return std::make_shared(max_tokens, time_source, fill_rate_per_sec, + initial_tokens); } void GlobalRateLimitClientImpl::onReceiveMessage(RateLimitQuotaResponsePtr&& response) { diff --git a/test/common/common/token_bucket_impl_test.cc b/test/common/common/token_bucket_impl_test.cc index a1ba30098b0c3..a8ed5a306888b 100644 --- a/test/common/common/token_bucket_impl_test.cc +++ b/test/common/common/token_bucket_impl_test.cc @@ -134,22 +134,20 @@ class AtomicTokenBucketImplTest : public testing::Test { // Verifies TokenBucket initialization. TEST_F(AtomicTokenBucketImplTest, Initialization) { - AtomicTokenBucketImpl token_bucket{1, time_system_, std::chrono::seconds(1), -1.0}; + AtomicTokenBucketImpl token_bucket{1, time_system_, -1.0}; EXPECT_EQ(1, token_bucket.fillRate()); EXPECT_EQ(1, token_bucket.maxTokens()); EXPECT_EQ(1, token_bucket.remainingTokens()); - EXPECT_EQ(token_bucket.nextTokenAvailable(), std::chrono::milliseconds(0)); EXPECT_EQ(1, token_bucket.consume(1, false)); EXPECT_EQ(0, token_bucket.consume(1, false)); EXPECT_EQ(false, token_bucket.consume()); - EXPECT_EQ(token_bucket.nextTokenAvailable(), std::chrono::seconds(1)); } // Verifies TokenBucket's maximum capacity. TEST_F(AtomicTokenBucketImplTest, MaxBucketSize) { - AtomicTokenBucketImpl token_bucket{3, time_system_, std::chrono::seconds(1), 1}; + AtomicTokenBucketImpl token_bucket{3, time_system_, 1}; EXPECT_EQ(1, token_bucket.fillRate()); EXPECT_EQ(3, token_bucket.maxTokens()); @@ -163,7 +161,7 @@ TEST_F(AtomicTokenBucketImplTest, MaxBucketSize) { // Verifies that TokenBucket can consume tokens. TEST_F(AtomicTokenBucketImplTest, Consume) { - AtomicTokenBucketImpl token_bucket{10, time_system_, std::chrono::seconds(1), 1}; + AtomicTokenBucketImpl token_bucket{10, time_system_, 1}; EXPECT_EQ(0, token_bucket.consume(20, false)); EXPECT_EQ(9, token_bucket.consume(9, false)); @@ -184,7 +182,7 @@ TEST_F(AtomicTokenBucketImplTest, Consume) { // Verifies that TokenBucket can refill tokens. TEST_F(AtomicTokenBucketImplTest, Refill) { - AtomicTokenBucketImpl token_bucket{1, time_system_, std::chrono::seconds(1), 0.5}; + AtomicTokenBucketImpl token_bucket{1, time_system_, 0.5}; EXPECT_EQ(1, token_bucket.consume(1, false)); time_system_.setMonotonicTime(std::chrono::milliseconds(500)); @@ -195,9 +193,18 @@ TEST_F(AtomicTokenBucketImplTest, Refill) { EXPECT_EQ(1, token_bucket.consume(1, false)); } +TEST_F(AtomicTokenBucketImplTest, NextTokenAvailable) { + AtomicTokenBucketImpl token_bucket{10, time_system_, 5}; + EXPECT_EQ(9, token_bucket.consume(9, false)); + EXPECT_EQ(std::chrono::milliseconds(0), token_bucket.nextTokenAvailable()); + EXPECT_EQ(1, token_bucket.consume(1, false)); + EXPECT_EQ(0, token_bucket.consume(1, false)); + EXPECT_EQ(std::chrono::milliseconds(200), token_bucket.nextTokenAvailable()); +} + // Test partial consumption of tokens. TEST_F(AtomicTokenBucketImplTest, PartialConsumption) { - AtomicTokenBucketImpl token_bucket{16, time_system_, std::chrono::seconds(1), 16}; + AtomicTokenBucketImpl token_bucket{16, time_system_, 16}; EXPECT_EQ(16, token_bucket.consume(18, true)); time_system_.advanceTimeWait(std::chrono::milliseconds(62)); EXPECT_EQ(0, token_bucket.consume(1, true)); @@ -209,8 +216,7 @@ TEST_F(AtomicTokenBucketImplTest, PartialConsumption) { TEST_F(AtomicTokenBucketImplTest, YearlyMinRefillRate) { constexpr uint64_t seconds_per_year = 365 * 24 * 60 * 60; // Set the fill rate to be 2 years. - AtomicTokenBucketImpl token_bucket{1, time_system_, std::chrono::seconds(seconds_per_year), - 1.0 / (seconds_per_year * 2)}; + AtomicTokenBucketImpl token_bucket{1, time_system_, 1.0 / (seconds_per_year * 2)}; // Consume first token. EXPECT_EQ(1, token_bucket.consume(1, false)); @@ -223,7 +229,7 @@ TEST_F(AtomicTokenBucketImplTest, YearlyMinRefillRate) { } TEST_F(AtomicTokenBucketImplTest, ConsumeNegativeTokens) { - AtomicTokenBucketImpl token_bucket{10, time_system_, std::chrono::seconds(1), 1}; + AtomicTokenBucketImpl token_bucket{10, time_system_, 1}; EXPECT_EQ(3, token_bucket.consume([](double) { return 3; })); EXPECT_EQ(7, token_bucket.remainingTokens()); @@ -232,7 +238,7 @@ TEST_F(AtomicTokenBucketImplTest, ConsumeNegativeTokens) { } TEST_F(AtomicTokenBucketImplTest, ConsumeSuperLargeTokens) { - AtomicTokenBucketImpl token_bucket{10, time_system_, std::chrono::seconds(1), 1}; + AtomicTokenBucketImpl token_bucket{10, time_system_, 1}; EXPECT_EQ(100, token_bucket.consume([](double) { return 100; })); EXPECT_EQ(-90, token_bucket.remainingTokens()); @@ -242,7 +248,7 @@ TEST_F(AtomicTokenBucketImplTest, MultipleThreadsConsume) { // Real time source to ensure we will not fall into endless loop. Event::TestRealTimeSystem real_time_source; - AtomicTokenBucketImpl token_bucket{1200, time_system_, std::chrono::seconds(1), 1.0}; + AtomicTokenBucketImpl token_bucket{1200, time_system_, 1.0}; // Exhaust all tokens. EXPECT_EQ(1200, token_bucket.consume(1200, false)); diff --git a/test/extensions/filters/http/local_ratelimit/local_ratelimit_integration_test.cc b/test/extensions/filters/http/local_ratelimit/local_ratelimit_integration_test.cc index ca793507dbdca..60c0b2885b6e8 100644 --- a/test/extensions/filters/http/local_ratelimit/local_ratelimit_integration_test.cc +++ b/test/extensions/filters/http/local_ratelimit/local_ratelimit_integration_test.cc @@ -572,18 +572,18 @@ TEST_P(LocalRateLimitFilterIntegrationTest, HeaderTest) { // Max tokens is 2, the second request should be allowed. codec_client_ = makeHttpConnection(lookupPort("http")); - sendAndVerifyRequest("2", "0", "4"); + sendAndVerifyRequest("2", "0", "2"); cleanupUpstreamAndDownstream(); - // The third request should be rate limited, x-ratelimit-reset should be 4s. + // The third request should be rate limited, x-ratelimit-reset should be 2s. codec_client_ = makeHttpConnection(lookupPort("http")); - sendRateLimitedRequest("2", "0", "4"); + sendRateLimitedRequest("2", "0", "2"); cleanupUpstreamAndDownstream(); - // After 1s, the forth request should be rate limited, x-ratelimit-reset should be 3s. + // After 1s, the forth request should be rate limited, x-ratelimit-reset should be 1s. simTime().advanceTimeWait(std::chrono::seconds(1)); codec_client_ = makeHttpConnection(lookupPort("http")); - sendRateLimitedRequest("2", "0", "3"); + sendRateLimitedRequest("2", "0", "1"); } TEST_P(LocalRateLimitFilterIntegrationTest, PermitRequestAcrossDifferentConnections) { diff --git a/test/extensions/filters/http/rate_limit_quota/filter_test.cc b/test/extensions/filters/http/rate_limit_quota/filter_test.cc index fa401f0a1b5b6..96cdd58b18195 100644 --- a/test/extensions/filters/http/rate_limit_quota/filter_test.cc +++ b/test/extensions/filters/http/rate_limit_quota/filter_test.cc @@ -598,8 +598,7 @@ TEST_F(FilterTest, DecodeHeaderWithTokenBucketAllow) { token_bucket->mutable_fill_interval()->set_seconds(60); // 100 available tokens so the test doesn't get throttled. std::shared_ptr token_bucket_limiter = - std::make_shared(100, dispatcher_.timeSource(), - std::chrono::seconds(60), 100 / 60); + std::make_shared(100, dispatcher_.timeSource(), 100 / 60); RateLimitQuotaResponse::BucketAction no_assignment_action; no_assignment_action.mutable_quota_assignment_action() @@ -644,8 +643,7 @@ TEST_F(FilterTest, DecodeHeaderWithTokenBucketDeny) { token_bucket->mutable_tokens_per_fill()->set_value(1); token_bucket->mutable_fill_interval()->set_seconds(60); std::shared_ptr token_bucket_limiter = - std::make_shared(1, dispatcher_.timeSource(), std::chrono::seconds(60), - 1 / 60); + std::make_shared(1, dispatcher_.timeSource(), 1 / 60); // All subsequent requests should deny for 60 (mock) seconds. EXPECT_TRUE(token_bucket_limiter->consume()); From 6b4b3ff82adc1d3fa5eae905cc71ff7e99b82e43 Mon Sep 17 00:00:00 2001 From: zirain Date: Wed, 21 May 2025 10:59:28 +0800 Subject: [PATCH 12/13] fix Signed-off-by: zirain --- source/common/common/token_bucket_impl.cc | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/source/common/common/token_bucket_impl.cc b/source/common/common/token_bucket_impl.cc index faec1dd82dfeb..5417f67a1882f 100644 --- a/source/common/common/token_bucket_impl.cc +++ b/source/common/common/token_bucket_impl.cc @@ -2,6 +2,7 @@ #include #include +#include namespace Envoy { @@ -111,8 +112,9 @@ std::chrono::milliseconds AtomicTokenBucketImpl::nextTokenAvailable() const { // Calculate time since the last fill. double current_time = timeNowInSeconds(); + double last_time = time_in_seconds_.load(); return std::chrono::milliseconds( - static_cast(1 / fill_rate_ * 1000 - (current_time - time_in_seconds_.load()))); + static_cast(((1 / fill_rate_ - (current_time - last_time)) * 1000))); } } // namespace Envoy From f0156cd00b937b05967b4bb9fcafc81195117acc Mon Sep 17 00:00:00 2001 From: zirain Date: Thu, 22 May 2025 10:30:19 +0800 Subject: [PATCH 13/13] address wpbcode comment Signed-off-by: zirain --- source/common/common/token_bucket_impl.cc | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/source/common/common/token_bucket_impl.cc b/source/common/common/token_bucket_impl.cc index 5417f67a1882f..bbed41b68d668 100644 --- a/source/common/common/token_bucket_impl.cc +++ b/source/common/common/token_bucket_impl.cc @@ -106,15 +106,13 @@ double AtomicTokenBucketImpl::timeNowInSeconds() const { std::chrono::milliseconds AtomicTokenBucketImpl::nextTokenAvailable() const { // If there are tokens available, return immediately. - if (remainingTokens() >= 1) { + const double remaining_tokens = remainingTokens(); + if (remaining_tokens >= 1) { return std::chrono::milliseconds(0); } - // Calculate time since the last fill. - double current_time = timeNowInSeconds(); - double last_time = time_in_seconds_.load(); return std::chrono::milliseconds( - static_cast(((1 / fill_rate_ - (current_time - last_time)) * 1000))); + static_cast(((1 - remaining_tokens) / fill_rate_) * 1000)); } } // namespace Envoy