Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ removed_config_or_runtime:
- area: dns
change: |
Removed runtime flag ``envoy.reloadable_features.dns_details`` and legacy code paths.
- area: local_ratelimit
change: |
Removed runtime guard ``envoy.reloadable_features.no_timer_based_rate_limit_token_bucket`` and legacy code paths.

new_features:
- area: oauth2
Expand Down
1 change: 0 additions & 1 deletion source/common/runtime/runtime_features.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ RUNTIME_GUARD(envoy_reloadable_features_logging_with_fast_json_formatter);
RUNTIME_GUARD(envoy_reloadable_features_lua_flow_control_while_http_call);
RUNTIME_GUARD(envoy_reloadable_features_mmdb_files_reload_enabled);
RUNTIME_GUARD(envoy_reloadable_features_no_extension_lookup_by_name);
RUNTIME_GUARD(envoy_reloadable_features_no_timer_based_rate_limit_token_bucket);
RUNTIME_GUARD(envoy_reloadable_features_normalize_rds_provider_config);
RUNTIME_GUARD(envoy_reloadable_features_oauth2_use_refresh_token);
RUNTIME_GUARD(envoy_reloadable_features_original_dst_rely_on_idle_timeout);
Expand Down
157 changes: 16 additions & 141 deletions source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,95 +75,14 @@ ShareProviderManagerSharedPtr ShareProviderManager::singleton(Event::Dispatcher&
});
}

TimerTokenBucket::TimerTokenBucket(uint64_t max_tokens, uint64_t tokens_per_fill,
std::chrono::milliseconds fill_interval, uint64_t multiplier,
LocalRateLimiterImpl& parent)
: multiplier_(multiplier), parent_(parent), max_tokens_(max_tokens),
tokens_per_fill_(tokens_per_fill), fill_interval_(fill_interval),
// Calculate the fill rate in tokens per second.
fill_rate_(tokens_per_fill /
std::chrono::duration_cast<std::chrono::duration<double>>(fill_interval).count()) {
ASSERT(multiplier_ != 0);
tokens_ = max_tokens;
fill_time_ = parent_.time_source_.monotonicTime();
}

absl::optional<int64_t> TimerTokenBucket::remainingFillInterval() const {
using namespace std::literals;

const auto time_after_last_fill = std::chrono::duration_cast<std::chrono::milliseconds>(
parent_.time_source_.monotonicTime() - fill_time_.load());

// Note that the fill timer may be delayed because other tasks are running on the main thread.
// So it's possible that the time_after_last_fill is greater than fill_interval_.
if (time_after_last_fill >= fill_interval_) {
return {};
}

return absl::ToInt64Seconds(absl::FromChrono(fill_interval_) -
absl::Seconds((time_after_last_fill) / 1s));
}

bool TimerTokenBucket::consume(double, uint64_t to_consume) {
// Relaxed consistency is used for all operations because we don't care about ordering, just the
// final atomic correctness.
uint64_t expected_tokens = tokens_.load(std::memory_order_relaxed);
do {
// expected_tokens is either initialized above or reloaded during the CAS failure below.
if (expected_tokens < to_consume) {
return false;
}

// Testing hook.
parent_.synchronizer_.syncPoint("allowed_pre_cas");

// Loop while the weak CAS fails trying to subtract tokens from expected.
} while (!tokens_.compare_exchange_weak(expected_tokens, expected_tokens - to_consume,
std::memory_order_relaxed));

// We successfully decremented the counter by tokens.
return true;
}

void TimerTokenBucket::onFillTimer(uint64_t refill_counter, double factor) {
// Descriptors are refilled every Nth timer hit where N is the ratio of the
// descriptor refill interval over the global refill interval. For example,
// if the descriptor refill interval is 150ms and the global refill
// interval is 50ms, this descriptor is refilled every 3rd call.
ASSERT(multiplier_ != 0);
if (refill_counter % multiplier_ != 0) {
return;
}

const uint64_t tokens_per_fill = std::ceil(tokens_per_fill_ * factor);

// Relaxed consistency is used for all operations because we don't care about ordering, just the
// final atomic correctness.
uint64_t expected_tokens = tokens_.load(std::memory_order_relaxed);
uint64_t new_tokens_value{};
do {
// expected_tokens is either initialized above or reloaded during the CAS failure below.
new_tokens_value = std::min(max_tokens_, expected_tokens + tokens_per_fill);

// Testing hook.
parent_.synchronizer_.syncPoint("on_fill_timer_pre_cas");

// Loop while the weak CAS fails trying to update the tokens value.
} while (
!tokens_.compare_exchange_weak(expected_tokens, new_tokens_value, std::memory_order_relaxed));

// Update fill time at last.
fill_time_ = parent_.time_source_.monotonicTime();
}

AtomicTokenBucket::AtomicTokenBucket(uint64_t max_tokens, uint64_t tokens_per_fill,
std::chrono::milliseconds fill_interval,
TimeSource& time_source)
RateLimitTokenBucket::RateLimitTokenBucket(uint64_t max_tokens, uint64_t tokens_per_fill,
std::chrono::milliseconds fill_interval,
TimeSource& time_source)
: token_bucket_(max_tokens, time_source,
// Calculate the fill rate in tokens per second.
tokens_per_fill / std::chrono::duration<double>(fill_interval).count()) {}

bool AtomicTokenBucket::consume(double factor, uint64_t to_consume) {
bool RateLimitTokenBucket::consume(double factor, uint64_t to_consume) {
ASSERT(!(factor <= 0.0 || factor > 1.0));
auto cb = [tokens = to_consume / factor](double total) { return total < tokens ? 0.0 : tokens; };
return token_bucket_.consume(cb) != 0.0;
Expand All @@ -175,29 +94,15 @@ LocalRateLimiterImpl::LocalRateLimiterImpl(
const Protobuf::RepeatedPtrField<
envoy::extensions::common::ratelimit::v3::LocalRateLimitDescriptor>& descriptors,
bool always_consume_default_token_bucket, ShareProviderSharedPtr shared_provider)
: fill_timer_(fill_interval > std::chrono::milliseconds(0)
? dispatcher.createTimer([this] { onFillTimer(); })
: nullptr),
time_source_(dispatcher.timeSource()), share_provider_(std::move(shared_provider)),
always_consume_default_token_bucket_(always_consume_default_token_bucket),
no_timer_based_rate_limit_token_bucket_(Runtime::runtimeFeatureEnabled(
"envoy.reloadable_features.no_timer_based_rate_limit_token_bucket")) {
if (fill_timer_ && fill_interval < std::chrono::milliseconds(50)) {
throw EnvoyException("local rate limit token bucket fill timer must be >= 50ms");
}
: time_source_(dispatcher.timeSource()), share_provider_(std::move(shared_provider)),
always_consume_default_token_bucket_(always_consume_default_token_bucket) {

if (no_timer_based_rate_limit_token_bucket_) {
default_token_bucket_ = std::make_shared<AtomicTokenBucket>(max_tokens, tokens_per_fill,
fill_interval, time_source_);
} else {
default_token_bucket_ =
std::make_shared<TimerTokenBucket>(max_tokens, tokens_per_fill, fill_interval, 1, *this);
if (fill_interval < std::chrono::milliseconds(50)) {
throw EnvoyException("local rate limit token bucket fill timer must be >= 50ms");
}

if (fill_timer_ && default_token_bucket_->fillInterval().count() > 0 &&
!no_timer_based_rate_limit_token_bucket_) {
fill_timer_->enableTimer(default_token_bucket_->fillInterval());
}
default_token_bucket_ = std::make_shared<RateLimitTokenBucket>(max_tokens, tokens_per_fill,
fill_interval, time_source_);

for (const auto& descriptor : descriptors) {
RateLimit::LocalDescriptor new_descriptor;
Expand All @@ -222,19 +127,11 @@ LocalRateLimiterImpl::LocalRateLimiterImpl(
throw EnvoyException(
"local rate descriptor limit is not a multiple of token bucket fill timer");
}
// Save the multiplicative factor to control the descriptor refill frequency.
const auto per_descriptor_multiplier = per_descriptor_fill_interval / fill_interval;

RateLimitTokenBucketSharedPtr per_descriptor_token_bucket;
if (no_timer_based_rate_limit_token_bucket_) {
per_descriptor_token_bucket = std::make_shared<AtomicTokenBucket>(
per_descriptor_max_tokens, per_descriptor_tokens_per_fill, per_descriptor_fill_interval,
time_source_);
} else {
per_descriptor_token_bucket = std::make_shared<TimerTokenBucket>(
per_descriptor_max_tokens, per_descriptor_tokens_per_fill, per_descriptor_fill_interval,
per_descriptor_multiplier, *this);
}

RateLimitTokenBucketSharedPtr per_descriptor_token_bucket =
std::make_shared<RateLimitTokenBucket>(per_descriptor_max_tokens,
per_descriptor_tokens_per_fill,
per_descriptor_fill_interval, time_source_);

auto result =
descriptors_.emplace(std::move(new_descriptor), std::move(per_descriptor_token_bucket));
Expand All @@ -245,29 +142,7 @@ LocalRateLimiterImpl::LocalRateLimiterImpl(
}
}

LocalRateLimiterImpl::~LocalRateLimiterImpl() {
if (fill_timer_ != nullptr) {
fill_timer_->disableTimer();
}
}

void LocalRateLimiterImpl::onFillTimer() {
// Since descriptors tokens are refilled whenever the remainder of dividing refill_counter_
// by descriptor.multiplier_ is zero and refill_counter_ is initialized to zero, it must be
// incremented before executing the onFillTimerDescriptorHelper() method to prevent all
// descriptors tokens from being refilled at the first time hit, regardless of its fill
// interval configuration.
refill_counter_++;
const double share_factor =
share_provider_ != nullptr ? share_provider_->getTokensShareFactor() : 1.0;

default_token_bucket_->onFillTimer(refill_counter_, share_factor);
for (const auto& descriptor : descriptors_) {
descriptor.second->onFillTimer(refill_counter_, share_factor);
}

fill_timer_->enableTimer(default_token_bucket_->fillInterval());
}
LocalRateLimiterImpl::~LocalRateLimiterImpl() = default;

struct MatchResult {
std::reference_wrapper<RateLimitTokenBucket> token_bucket;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,58 +67,17 @@ class TokenBucketContext {
virtual absl::optional<int64_t> remainingFillInterval() const PURE;
};

class RateLimitTokenBucket : public TokenBucketContext {
public:
virtual bool consume(double factor = 1.0, uint64_t tokens = 1) PURE;
virtual void onFillTimer(uint64_t refill_counter, double factor = 1.0) PURE;
virtual std::chrono::milliseconds fillInterval() const PURE;
virtual double fillRate() const PURE;
};
using RateLimitTokenBucketSharedPtr = std::shared_ptr<RateLimitTokenBucket>;

class LocalRateLimiterImpl;

// Token bucket that implements based on the periodic timer.
class TimerTokenBucket : public RateLimitTokenBucket {
class RateLimitTokenBucket : public TokenBucketContext {
public:
TimerTokenBucket(uint64_t max_tokens, uint64_t tokens_per_fill,
std::chrono::milliseconds fill_interval, uint64_t multiplier,
LocalRateLimiterImpl& parent);
RateLimitTokenBucket(uint64_t max_tokens, uint64_t tokens_per_fill,
std::chrono::milliseconds fill_interval, TimeSource& time_source);

// RateLimitTokenBucket
bool consume(double factor = 1.0, uint64_t tokens = 1) override;
void onFillTimer(uint64_t refill_counter, double factor) override;
std::chrono::milliseconds fillInterval() const override { return fill_interval_; }
double fillRate() const override { return fill_rate_; }
uint64_t maxTokens() const override { return max_tokens_; }
uint64_t remainingTokens() const override { return tokens_.load(); }
absl::optional<int64_t> remainingFillInterval() const override;

// Descriptor refill interval is a multiple of the timer refill interval.
// For example, if the descriptor refill interval is 150ms and the global
// refill interval is 50ms, the value is 3. Every 3rd invocation of
// the global timer, the descriptor is refilled.
const uint64_t multiplier_{};
LocalRateLimiterImpl& parent_;
std::atomic<uint64_t> tokens_{};
std::atomic<MonotonicTime> fill_time_{};

const uint64_t max_tokens_{};
const uint64_t tokens_per_fill_{};
const std::chrono::milliseconds fill_interval_{};
const double fill_rate_{};
};
bool consume(double factor = 1.0, uint64_t tokens = 1);
double fillRate() const { return token_bucket_.fillRate(); }

class AtomicTokenBucket : public RateLimitTokenBucket {
public:
AtomicTokenBucket(uint64_t max_tokens, uint64_t tokens_per_fill,
std::chrono::milliseconds fill_interval, TimeSource& time_source);

// RateLimitTokenBucket
bool consume(double factor = 1.0, uint64_t tokens = 1) override;
void onFillTimer(uint64_t, double) override {}
std::chrono::milliseconds fillInterval() const override { return {}; }
double fillRate() const override { return token_bucket_.fillRate(); }
uint64_t maxTokens() const override { return static_cast<uint64_t>(token_bucket_.maxTokens()); }
uint64_t remainingTokens() const override {
return static_cast<uint64_t>(token_bucket_.remainingTokens());
Expand All @@ -128,6 +87,7 @@ class AtomicTokenBucket : public RateLimitTokenBucket {
private:
AtomicTokenBucketImpl token_bucket_;
};
using RateLimitTokenBucketSharedPtr = std::shared_ptr<RateLimitTokenBucket>;

class LocalRateLimiterImpl {
public:
Expand All @@ -148,24 +108,15 @@ class LocalRateLimiterImpl {
Result requestAllowed(absl::Span<const RateLimit::Descriptor> request_descriptors) const;

private:
void onFillTimer();

RateLimitTokenBucketSharedPtr default_token_bucket_;

const Event::TimerPtr fill_timer_;
TimeSource& time_source_;
RateLimit::LocalDescriptor::Map<RateLimitTokenBucketSharedPtr> descriptors_;
// Refill counter is incremented per each refill timer hit.
uint64_t refill_counter_{0};

ShareProviderSharedPtr share_provider_;

mutable Thread::ThreadSynchronizer synchronizer_; // Used for testing only.
const bool always_consume_default_token_bucket_{};
const bool no_timer_based_rate_limit_token_bucket_{};

friend class LocalRateLimiterImplTest;
friend class TimerTokenBucket;
};

} // namespace LocalRateLimit
Expand Down
Loading