Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,20 @@ message LocalRateLimit {

// Defines the standard version to use for X-RateLimit headers emitted by the filter.
//
// * ``X-RateLimit-Limit`` - indicates the request-quota associated to the
// client in the current time-window followed by the description of the
// quota policy.
// * ``X-RateLimit-Remaining`` - indicates the remaining requests in the
// current time-window.
// * ``X-RateLimit-Reset`` - indicates the number of seconds until reset of
// the current time-window.
//
// In case rate limiting policy specifies more then one time window, the values
// above represent the window that is closest to reaching its limit.
//
// For more information about the headers specification see selected version of
// the `draft RFC <https://tools.ietf.org/id/draft-polli-ratelimit-headers-03.html>`_.
//
// Disabled by default.
common.ratelimit.v3.XRateLimitHeadersRFCVersion enable_x_ratelimit_headers = 12
[(validate.rules).enum = {defined_only: true}];
Expand Down
3 changes: 3 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -149,5 +149,8 @@ new_features:
added :ref:`enable_ja4_fingerprinting
<envoy_v3_api_field_extensions.filters.listener.tls_inspector.v3.TlsInspector.enable_ja4_fingerprinting>` to create
a JA4 fingerprint hash from the Client Hello message.
- area: local_ratelimit
change: |
``local_ratelimit`` will return ``x-ratelimit-reset`` header when the rate limit is exceeded.

deprecated:
14 changes: 14 additions & 0 deletions source/common/common/token_bucket_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <atomic>
#include <chrono>
#include <iostream>

namespace Envoy {

Expand Down Expand Up @@ -103,4 +104,17 @@ double AtomicTokenBucketImpl::timeNowInSeconds() const {
return std::chrono::duration<double>(time_source_.monotonicTime().time_since_epoch()).count();
}

std::chrono::milliseconds AtomicTokenBucketImpl::nextTokenAvailable() const {
// If there are tokens available, return immediately.
if (remainingTokens() >= 1) {
return std::chrono::milliseconds(0);
}

// Calculate time since the last fill.
double current_time = timeNowInSeconds();
double last_time = time_in_seconds_.load();
return std::chrono::milliseconds(
static_cast<uint64_t>(((1 / fill_rate_ - (current_time - last_time)) * 1000)));
}
Comment thread
zirain marked this conversation as resolved.
Comment on lines +107 to +116

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although it's will be a very very very trival case, but it's possible when we check remainingTokens(), the remaining tokens are less then 1. Then, when we calculate the next available token, time passes, the 1 / fill_rate_ - (current_time - last_time) may return a minus value.

Comment on lines +107 to +116

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
std::chrono::milliseconds AtomicTokenBucketImpl::nextTokenAvailable() const {
// If there are tokens available, return immediately.
if (remainingTokens() >= 1) {
return std::chrono::milliseconds(0);
}
// Calculate time since the last fill.
double current_time = timeNowInSeconds();
double last_time = time_in_seconds_.load();
return std::chrono::milliseconds(
static_cast<uint64_t>(((1 / fill_rate_ - (current_time - last_time)) * 1000)));
}
std::chrono::milliseconds AtomicTokenBucketImpl::nextTokenAvailable() const {
// If there are tokens available, return immediately.
const double remaining_tokens = remainingTokens();
if (remaining_tokens >= 1) {
return std::chrono::milliseconds(0);
}
// Calculate time since the last fill.
return std::chrono::milliseconds(
static_cast<uint64_t>(((1 - remaining_tokens) / fill_rate_) * 1000));
}


} // namespace Envoy
8 changes: 7 additions & 1 deletion source/common/common/token_bucket_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,19 @@ class AtomicTokenBucketImpl {
*/
double remainingTokens() const;

/**
* Get the time to next token available. This is a snapshot and may change after the call.
* @return the time to next token available.
*/
std::chrono::milliseconds nextTokenAvailable() const;

private:
double timeNowInSeconds() const;

const double max_tokens_;
const double fill_rate_;

std::atomic<double> time_in_seconds_{};
std::atomic<double> time_in_seconds_;
TimeSource& time_source_;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ class TokenBucketContext {

virtual uint64_t maxTokens() const PURE;
virtual uint64_t remainingTokens() const PURE;
virtual uint64_t resetSeconds() const PURE;
};

class RateLimitTokenBucket : public TokenBucketContext,
Expand All @@ -122,6 +123,9 @@ class RateLimitTokenBucket : public TokenBucketContext,
uint64_t remainingTokens() const override {
return static_cast<uint64_t>(token_bucket_.remainingTokens());
}
uint64_t resetSeconds() const override {
return static_cast<uint64_t>(std::ceil(token_bucket_.nextTokenAvailable().count() / 1000));
}

private:
AtomicTokenBucketImpl token_bucket_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,9 @@ Http::FilterHeadersStatus Filter::encodeHeaders(Http::ResponseHeaderMap& headers
headers.addReferenceKey(
HttpFilters::Common::RateLimit::XRateLimitHeaders::get().XRateLimitRemaining,
token_bucket_context_->remainingTokens());
headers.addReferenceKey(
HttpFilters::Common::RateLimit::XRateLimitHeaders::get().XRateLimitReset,
token_bucket_context_->resetSeconds());
}

return Http::FilterHeadersStatus::Continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,6 @@ createTokenBucketFromAction(const RateLimitStrategy& strategy, TimeSource& time_
? max_tokens * (existing_token_bucket->remainingTokens() /
existing_token_bucket->maxTokens())
: max_tokens;

return std::make_shared<AtomicTokenBucketImpl>(max_tokens, time_source, fill_rate_per_sec,
initial_tokens);
}
Expand Down
9 changes: 9 additions & 0 deletions test/common/common/token_bucket_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,15 @@ TEST_F(AtomicTokenBucketImplTest, Refill) {
EXPECT_EQ(1, token_bucket.consume(1, false));
}

TEST_F(AtomicTokenBucketImplTest, NextTokenAvailable) {
AtomicTokenBucketImpl token_bucket{10, time_system_, 5};
EXPECT_EQ(9, token_bucket.consume(9, false));
EXPECT_EQ(std::chrono::milliseconds(0), token_bucket.nextTokenAvailable());
EXPECT_EQ(1, token_bucket.consume(1, false));
EXPECT_EQ(0, token_bucket.consume(1, false));
EXPECT_EQ(std::chrono::milliseconds(200), token_bucket.nextTokenAvailable());
}

// Test partial consumption of tokens.
TEST_F(AtomicTokenBucketImplTest, PartialConsumption) {
AtomicTokenBucketImpl token_bucket{16, time_system_, 16};
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
#include <chrono>

#include "source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.h"
#include "source/extensions/filters/http/common/ratelimit_headers.h"

#include "test/integration/http_protocol_integration.h"
#include "test/test_common/test_runtime.h"
Expand Down Expand Up @@ -159,6 +162,29 @@ class LocalRateLimitFilterIntegrationTest : public Event::TestUsingSimulatedTime
EXPECT_EQ(expected_status, response->headers().getStatusValue());
EXPECT_EQ(expected_body_size, response->body().size());
}
void verifyResponse(IntegrationStreamDecoderPtr response, const std::string& expected_status,
size_t expected_body_size, const std::string& expected_limit,
const std::string& expected_remaining, const std::string& expected_reset) {
ASSERT_TRUE(response->waitForEndStream());
EXPECT_TRUE(response->complete());
EXPECT_EQ(expected_status, response->headers().getStatusValue());
EXPECT_EQ(expected_body_size, response->body().size());
EXPECT_THAT(
response->headers(),
Http::HeaderValueOf(
Extensions::HttpFilters::Common::RateLimit::XRateLimitHeaders::get().XRateLimitLimit,
expected_limit));
EXPECT_THAT(
response->headers(),
Http::HeaderValueOf(Extensions::HttpFilters::Common::RateLimit::XRateLimitHeaders::get()
.XRateLimitRemaining,
expected_remaining));
EXPECT_THAT(
response->headers(),
Http::HeaderValueOf(
Extensions::HttpFilters::Common::RateLimit::XRateLimitHeaders::get().XRateLimitReset,
expected_reset));
}

void sendAndVerifyRequest(const std::string& cluster, const std::string& expected_status,
size_t expected_body_size) {
Expand All @@ -169,11 +195,29 @@ class LocalRateLimitFilterIntegrationTest : public Event::TestUsingSimulatedTime
EXPECT_TRUE(upstream_request_->complete());
EXPECT_EQ(0U, upstream_request_->bodyLength());
}
void sendAndVerifyRequest(const std::string& expected_limit,
const std::string& expected_remaining,
const std::string& expected_reset) {
auto response = codec_client_->makeRequestWithBody(default_request_headers_, 0);
waitForNextUpstreamRequest();
upstream_request_->encodeHeaders(default_response_headers_, 1);
verifyResponse(std::move(response), "200", 0, expected_limit, expected_remaining,
expected_reset);
EXPECT_TRUE(upstream_request_->complete());
EXPECT_EQ(0U, upstream_request_->bodyLength());
}
void sendRateLimitedRequest(const std::string& cluster) {
auto response = makeRequest(cluster);
verifyResponse(std::move(response), "429",
18); // 18 is the expected body size for rate-limited responses.
}
void sendRateLimitedRequest(const std::string& expected_limit,
const std::string& expected_remaining,
const std::string& expected_reset) {
auto response = codec_client_->makeRequestWithBody(default_request_headers_, 0);
verifyResponse(std::move(response), "429", 18, expected_limit, expected_remaining,
expected_reset);
}

static constexpr absl::string_view filter_config_ =
R"EOF(
Expand Down Expand Up @@ -203,6 +247,35 @@ name: envoy.filters.http.local_ratelimit
local_rate_limit_per_downstream_connection: {}
)EOF";

static constexpr absl::string_view limit_header_filter_config_ =
R"EOF(
name: envoy.filters.http.local_ratelimit
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.local_ratelimit.v3.LocalRateLimit
stat_prefix: http_local_rate_limiter
enableXRatelimitHeaders: DRAFT_VERSION_03
token_bucket:
max_tokens: 2
tokens_per_fill: 2
fill_interval: 4s
filter_enabled:
runtime_key: local_rate_limit_enabled
default_value:
numerator: 100
denominator: HUNDRED
filter_enforced:
runtime_key: local_rate_limit_enforced
default_value:
numerator: 100
denominator: HUNDRED
response_headers_to_add:
- append_action: OVERWRITE_IF_EXISTS_OR_ADD
header:
key: x-local-rate-limit
value: 'true'
local_rate_limit_per_downstream_connection: {}
)EOF";

static constexpr absl::string_view filter_config_with_blank_value_descriptor_ =
R"EOF(
name: envoy.filters.http.local_ratelimit
Expand Down Expand Up @@ -489,6 +562,30 @@ TEST_P(LocalRateLimitFilterIntegrationTest, DenyRequestWithinSameConnection) {
EXPECT_EQ(18, response->body().size());
}

TEST_P(LocalRateLimitFilterIntegrationTest, HeaderTest) {
initializeFilter(fmt::format(limit_header_filter_config_, "false"));

// The first request should be allowed.
codec_client_ = makeHttpConnection(lookupPort("http"));
sendAndVerifyRequest("2", "1", "0");
cleanupUpstreamAndDownstream();

// Max tokens is 2, the second request should be allowed.
codec_client_ = makeHttpConnection(lookupPort("http"));
sendAndVerifyRequest("2", "0", "2");
cleanupUpstreamAndDownstream();

// The third request should be rate limited, x-ratelimit-reset should be 2s.
codec_client_ = makeHttpConnection(lookupPort("http"));
sendRateLimitedRequest("2", "0", "2");
cleanupUpstreamAndDownstream();

// After 1s, the forth request should be rate limited, x-ratelimit-reset should be 1s.
simTime().advanceTimeWait(std::chrono::seconds(1));
codec_client_ = makeHttpConnection(lookupPort("http"));
sendRateLimitedRequest("2", "0", "1");
}

TEST_P(LocalRateLimitFilterIntegrationTest, PermitRequestAcrossDifferentConnections) {
initializeFilter(fmt::format(filter_config_, "true"));

Expand Down Expand Up @@ -562,7 +659,6 @@ TEST_P(LocalRateLimitFilterIntegrationTest, BasicTestPerRouteAndRds) {
EXPECT_TRUE(response->complete());
EXPECT_EQ("200", response->headers().getStatusValue());
EXPECT_EQ(0, response->body().size());

cleanupUpstreamAndDownstream();

cleanUpXdsConnection();
Expand Down
Loading