Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ removed_config_or_runtime:
- area: dns
change: |
Removed runtime flag ``envoy.reloadable_features.dns_details`` and legacy code paths.
- area: local_ratelimit
change: |
Removed runtime guard ``envoy.reloadable_features.no_timer_based_rate_limit_token_bucket`` and legacy code paths.

new_features:
- area: oauth2
Expand Down
1 change: 0 additions & 1 deletion source/common/runtime/runtime_features.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ RUNTIME_GUARD(envoy_reloadable_features_logging_with_fast_json_formatter);
RUNTIME_GUARD(envoy_reloadable_features_lua_flow_control_while_http_call);
RUNTIME_GUARD(envoy_reloadable_features_mmdb_files_reload_enabled);
RUNTIME_GUARD(envoy_reloadable_features_no_extension_lookup_by_name);
RUNTIME_GUARD(envoy_reloadable_features_no_timer_based_rate_limit_token_bucket);
RUNTIME_GUARD(envoy_reloadable_features_normalize_rds_provider_config);
RUNTIME_GUARD(envoy_reloadable_features_oauth2_use_refresh_token);
RUNTIME_GUARD(envoy_reloadable_features_original_dst_rely_on_idle_timeout);
Expand Down
174 changes: 27 additions & 147 deletions source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,95 +75,14 @@ ShareProviderManagerSharedPtr ShareProviderManager::singleton(Event::Dispatcher&
});
}

TimerTokenBucket::TimerTokenBucket(uint64_t max_tokens, uint64_t tokens_per_fill,
std::chrono::milliseconds fill_interval, uint64_t multiplier,
LocalRateLimiterImpl& parent)
: multiplier_(multiplier), parent_(parent), max_tokens_(max_tokens),
tokens_per_fill_(tokens_per_fill), fill_interval_(fill_interval),
// Calculate the fill rate in tokens per second.
fill_rate_(tokens_per_fill /
std::chrono::duration_cast<std::chrono::duration<double>>(fill_interval).count()) {
ASSERT(multiplier_ != 0);
tokens_ = max_tokens;
fill_time_ = parent_.time_source_.monotonicTime();
}

absl::optional<int64_t> TimerTokenBucket::remainingFillInterval() const {
using namespace std::literals;

const auto time_after_last_fill = std::chrono::duration_cast<std::chrono::milliseconds>(
parent_.time_source_.monotonicTime() - fill_time_.load());

// Note that the fill timer may be delayed because other tasks are running on the main thread.
// So it's possible that the time_after_last_fill is greater than fill_interval_.
if (time_after_last_fill >= fill_interval_) {
return {};
}

return absl::ToInt64Seconds(absl::FromChrono(fill_interval_) -
absl::Seconds((time_after_last_fill) / 1s));
}

bool TimerTokenBucket::consume(double, uint64_t to_consume) {
// Relaxed consistency is used for all operations because we don't care about ordering, just the
// final atomic correctness.
uint64_t expected_tokens = tokens_.load(std::memory_order_relaxed);
do {
// expected_tokens is either initialized above or reloaded during the CAS failure below.
if (expected_tokens < to_consume) {
return false;
}

// Testing hook.
parent_.synchronizer_.syncPoint("allowed_pre_cas");

// Loop while the weak CAS fails trying to subtract tokens from expected.
} while (!tokens_.compare_exchange_weak(expected_tokens, expected_tokens - to_consume,
std::memory_order_relaxed));

// We successfully decremented the counter by tokens.
return true;
}

void TimerTokenBucket::onFillTimer(uint64_t refill_counter, double factor) {
// Descriptors are refilled every Nth timer hit where N is the ratio of the
// descriptor refill interval over the global refill interval. For example,
// if the descriptor refill interval is 150ms and the global refill
// interval is 50ms, this descriptor is refilled every 3rd call.
ASSERT(multiplier_ != 0);
if (refill_counter % multiplier_ != 0) {
return;
}

const uint64_t tokens_per_fill = std::ceil(tokens_per_fill_ * factor);

// Relaxed consistency is used for all operations because we don't care about ordering, just the
// final atomic correctness.
uint64_t expected_tokens = tokens_.load(std::memory_order_relaxed);
uint64_t new_tokens_value{};
do {
// expected_tokens is either initialized above or reloaded during the CAS failure below.
new_tokens_value = std::min(max_tokens_, expected_tokens + tokens_per_fill);

// Testing hook.
parent_.synchronizer_.syncPoint("on_fill_timer_pre_cas");

// Loop while the weak CAS fails trying to update the tokens value.
} while (
!tokens_.compare_exchange_weak(expected_tokens, new_tokens_value, std::memory_order_relaxed));

// Update fill time at last.
fill_time_ = parent_.time_source_.monotonicTime();
}

AtomicTokenBucket::AtomicTokenBucket(uint64_t max_tokens, uint64_t tokens_per_fill,
std::chrono::milliseconds fill_interval,
TimeSource& time_source)
RateLimitTokenBucket::RateLimitTokenBucket(uint64_t max_tokens, uint64_t tokens_per_fill,
std::chrono::milliseconds fill_interval,
TimeSource& time_source)
: token_bucket_(max_tokens, time_source,
// Calculate the fill rate in tokens per second.
tokens_per_fill / std::chrono::duration<double>(fill_interval).count()) {}

bool AtomicTokenBucket::consume(double factor, uint64_t to_consume) {
bool RateLimitTokenBucket::consume(double factor, uint64_t to_consume) {
ASSERT(!(factor <= 0.0 || factor > 1.0));
auto cb = [tokens = to_consume / factor](double total) { return total < tokens ? 0.0 : tokens; };
return token_bucket_.consume(cb) != 0.0;
Expand All @@ -175,28 +94,17 @@ LocalRateLimiterImpl::LocalRateLimiterImpl(
const Protobuf::RepeatedPtrField<
envoy::extensions::common::ratelimit::v3::LocalRateLimitDescriptor>& descriptors,
bool always_consume_default_token_bucket, ShareProviderSharedPtr shared_provider)
: fill_timer_(fill_interval > std::chrono::milliseconds(0)
? dispatcher.createTimer([this] { onFillTimer(); })
: nullptr),
time_source_(dispatcher.timeSource()), share_provider_(std::move(shared_provider)),
always_consume_default_token_bucket_(always_consume_default_token_bucket),
no_timer_based_rate_limit_token_bucket_(Runtime::runtimeFeatureEnabled(
"envoy.reloadable_features.no_timer_based_rate_limit_token_bucket")) {
if (fill_timer_ && fill_interval < std::chrono::milliseconds(50)) {
throw EnvoyException("local rate limit token bucket fill timer must be >= 50ms");
}

if (no_timer_based_rate_limit_token_bucket_) {
default_token_bucket_ = std::make_shared<AtomicTokenBucket>(max_tokens, tokens_per_fill,
fill_interval, time_source_);
} else {
default_token_bucket_ =
std::make_shared<TimerTokenBucket>(max_tokens, tokens_per_fill, fill_interval, 1, *this);
}

if (fill_timer_ && default_token_bucket_->fillInterval().count() > 0 &&
!no_timer_based_rate_limit_token_bucket_) {
fill_timer_->enableTimer(default_token_bucket_->fillInterval());
: time_source_(dispatcher.timeSource()), share_provider_(std::move(shared_provider)),
always_consume_default_token_bucket_(always_consume_default_token_bucket) {

// Ignore the default token bucket if fill_interval is 0 because 0 fill_interval means nothing
// and has undefined behavior.
if (fill_interval.count() > 0) {
if (fill_interval < std::chrono::milliseconds(50)) {
throw EnvoyException("local rate limit token bucket fill timer must be >= 50ms");
}
default_token_bucket_ = std::make_shared<RateLimitTokenBucket>(max_tokens, tokens_per_fill,
fill_interval, time_source_);
}

for (const auto& descriptor : descriptors) {
Expand All @@ -218,23 +126,10 @@ LocalRateLimiterImpl::LocalRateLimiterImpl(
throw EnvoyException("local rate limit descriptor token bucket fill timer must be >= 50ms");
}

if (per_descriptor_fill_interval.count() % fill_interval.count() != 0) {
throw EnvoyException(
"local rate descriptor limit is not a multiple of token bucket fill timer");
}
// Save the multiplicative factor to control the descriptor refill frequency.
const auto per_descriptor_multiplier = per_descriptor_fill_interval / fill_interval;

RateLimitTokenBucketSharedPtr per_descriptor_token_bucket;
if (no_timer_based_rate_limit_token_bucket_) {
per_descriptor_token_bucket = std::make_shared<AtomicTokenBucket>(
per_descriptor_max_tokens, per_descriptor_tokens_per_fill, per_descriptor_fill_interval,
time_source_);
} else {
per_descriptor_token_bucket = std::make_shared<TimerTokenBucket>(
per_descriptor_max_tokens, per_descriptor_tokens_per_fill, per_descriptor_fill_interval,
per_descriptor_multiplier, *this);
}
RateLimitTokenBucketSharedPtr per_descriptor_token_bucket =
std::make_shared<RateLimitTokenBucket>(per_descriptor_max_tokens,
per_descriptor_tokens_per_fill,
per_descriptor_fill_interval, time_source_);

auto result =
descriptors_.emplace(std::move(new_descriptor), std::move(per_descriptor_token_bucket));
Expand All @@ -245,29 +140,7 @@ LocalRateLimiterImpl::LocalRateLimiterImpl(
}
}

LocalRateLimiterImpl::~LocalRateLimiterImpl() {
if (fill_timer_ != nullptr) {
fill_timer_->disableTimer();
}
}

void LocalRateLimiterImpl::onFillTimer() {
// Since descriptors tokens are refilled whenever the remainder of dividing refill_counter_
// by descriptor.multiplier_ is zero and refill_counter_ is initialized to zero, it must be
// incremented before executing the onFillTimerDescriptorHelper() method to prevent all
// descriptors tokens from being refilled at the first time hit, regardless of its fill
// interval configuration.
refill_counter_++;
const double share_factor =
share_provider_ != nullptr ? share_provider_->getTokensShareFactor() : 1.0;

default_token_bucket_->onFillTimer(refill_counter_, share_factor);
for (const auto& descriptor : descriptors_) {
descriptor.second->onFillTimer(refill_counter_, share_factor);
}

fill_timer_->enableTimer(default_token_bucket_->fillInterval());
}
LocalRateLimiterImpl::~LocalRateLimiterImpl() = default;

struct MatchResult {
std::reference_wrapper<RateLimitTokenBucket> token_bucket;
Expand Down Expand Up @@ -312,6 +185,13 @@ LocalRateLimiterImpl::Result LocalRateLimiterImpl::requestAllowed(

// See if the request is forbidden by the default token bucket.
if (matched_results.empty() || always_consume_default_token_bucket_) {
if (default_token_bucket_ == nullptr) {
return {true, matched_results.empty()
? makeOptRefFromPtr<TokenBucketContext>(nullptr)
: makeOptRef<TokenBucketContext>(matched_results[0].token_bucket.get())};
}
ASSERT(default_token_bucket_ != nullptr);

if (const bool result = default_token_bucket_->consume(share_factor); !result) {
// If the request is forbidden by the default token bucket, return the result and the
// default token bucket.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,70 +64,28 @@ class TokenBucketContext {

virtual uint64_t maxTokens() const PURE;
virtual uint64_t remainingTokens() const PURE;
virtual absl::optional<int64_t> remainingFillInterval() const PURE;
};

class RateLimitTokenBucket : public TokenBucketContext {
public:
virtual bool consume(double factor = 1.0, uint64_t tokens = 1) PURE;
virtual void onFillTimer(uint64_t refill_counter, double factor = 1.0) PURE;
virtual std::chrono::milliseconds fillInterval() const PURE;
virtual double fillRate() const PURE;
};
using RateLimitTokenBucketSharedPtr = std::shared_ptr<RateLimitTokenBucket>;

class LocalRateLimiterImpl;

// Token bucket that implements based on the periodic timer.
class TimerTokenBucket : public RateLimitTokenBucket {
class RateLimitTokenBucket : public TokenBucketContext {
public:
TimerTokenBucket(uint64_t max_tokens, uint64_t tokens_per_fill,
std::chrono::milliseconds fill_interval, uint64_t multiplier,
LocalRateLimiterImpl& parent);
RateLimitTokenBucket(uint64_t max_tokens, uint64_t tokens_per_fill,
std::chrono::milliseconds fill_interval, TimeSource& time_source);

// RateLimitTokenBucket
bool consume(double factor = 1.0, uint64_t tokens = 1) override;
void onFillTimer(uint64_t refill_counter, double factor) override;
std::chrono::milliseconds fillInterval() const override { return fill_interval_; }
double fillRate() const override { return fill_rate_; }
uint64_t maxTokens() const override { return max_tokens_; }
uint64_t remainingTokens() const override { return tokens_.load(); }
absl::optional<int64_t> remainingFillInterval() const override;

// Descriptor refill interval is a multiple of the timer refill interval.
// For example, if the descriptor refill interval is 150ms and the global
// refill interval is 50ms, the value is 3. Every 3rd invocation of
// the global timer, the descriptor is refilled.
const uint64_t multiplier_{};
LocalRateLimiterImpl& parent_;
std::atomic<uint64_t> tokens_{};
std::atomic<MonotonicTime> fill_time_{};

const uint64_t max_tokens_{};
const uint64_t tokens_per_fill_{};
const std::chrono::milliseconds fill_interval_{};
const double fill_rate_{};
};
bool consume(double factor = 1.0, uint64_t tokens = 1);
double fillRate() const { return token_bucket_.fillRate(); }

class AtomicTokenBucket : public RateLimitTokenBucket {
public:
AtomicTokenBucket(uint64_t max_tokens, uint64_t tokens_per_fill,
std::chrono::milliseconds fill_interval, TimeSource& time_source);

// RateLimitTokenBucket
bool consume(double factor = 1.0, uint64_t tokens = 1) override;
void onFillTimer(uint64_t, double) override {}
std::chrono::milliseconds fillInterval() const override { return {}; }
double fillRate() const override { return token_bucket_.fillRate(); }
uint64_t maxTokens() const override { return static_cast<uint64_t>(token_bucket_.maxTokens()); }
uint64_t remainingTokens() const override {
return static_cast<uint64_t>(token_bucket_.remainingTokens());
}
absl::optional<int64_t> remainingFillInterval() const override { return {}; }

private:
AtomicTokenBucketImpl token_bucket_;
};
using RateLimitTokenBucketSharedPtr = std::shared_ptr<RateLimitTokenBucket>;

class LocalRateLimiterImpl {
public:
Expand All @@ -148,24 +106,15 @@ class LocalRateLimiterImpl {
Result requestAllowed(absl::Span<const RateLimit::Descriptor> request_descriptors) const;

private:
void onFillTimer();

RateLimitTokenBucketSharedPtr default_token_bucket_;

const Event::TimerPtr fill_timer_;
TimeSource& time_source_;
RateLimit::LocalDescriptor::Map<RateLimitTokenBucketSharedPtr> descriptors_;
// Refill counter is incremented per each refill timer hit.
uint64_t refill_counter_{0};

ShareProviderSharedPtr share_provider_;

mutable Thread::ThreadSynchronizer synchronizer_; // Used for testing only.
const bool always_consume_default_token_bucket_{};
const bool no_timer_based_rate_limit_token_bucket_{};

friend class LocalRateLimiterImplTest;
friend class TimerTokenBucket;
};

} // namespace LocalRateLimit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,11 +203,6 @@ Http::FilterHeadersStatus Filter::encodeHeaders(Http::ResponseHeaderMap& headers
headers.addReferenceKey(
HttpFilters::Common::RateLimit::XRateLimitHeaders::get().XRateLimitRemaining,
token_bucket_context_->remainingTokens());
const auto reset = token_bucket_context_->remainingFillInterval();
if (reset.has_value()) {
headers.addReferenceKey(
HttpFilters::Common::RateLimit::XRateLimitHeaders::get().XRateLimitReset, reset.value());
}
}

return Http::FilterHeadersStatus::Continue;
Expand Down
Loading
Loading