-
Notifications
You must be signed in to change notification settings - Fork 94
rate-limiting: test enhancement + refactoring #223
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 15 commits
90bd582
bf5ae1d
28011ef
be93d1a
76158b9
d29c95c
ebd857c
1d62a00
c86cb92
7fc2992
9def3cd
abda9db
4c54837
a8c94d8
88d335e
6d1e747
798de2b
4d6448b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,8 +6,10 @@ | |
|
|
||
| namespace Nighthawk { | ||
|
|
||
| using namespace std::chrono_literals; | ||
|
|
||
| BurstingRateLimiter::BurstingRateLimiter(RateLimiterPtr&& rate_limiter, const uint64_t burst_size) | ||
| : rate_limiter_(std::move(rate_limiter)), burst_size_(burst_size) { | ||
| : ForwardingRateLimiterImpl(std::move(rate_limiter)), burst_size_(burst_size) { | ||
| ASSERT(burst_size_ > 0); | ||
| } | ||
|
|
||
|
|
@@ -51,28 +53,23 @@ void BurstingRateLimiter::releaseOne() { | |
| } | ||
|
|
||
| LinearRateLimiter::LinearRateLimiter(Envoy::TimeSource& time_source, const Frequency frequency) | ||
| : time_source_(time_source), acquireable_count_(0), acquired_count_(0), frequency_(frequency) { | ||
| : RateLimiterBaseImpl(time_source), acquireable_count_(0), acquired_count_(0), | ||
| frequency_(frequency) { | ||
| if (frequency.value() <= 0) { | ||
| throw NighthawkException("Frequency must be > 0"); | ||
| } | ||
| } | ||
|
|
||
| bool LinearRateLimiter::tryAcquireOne() { | ||
| // TODO(oschaaf): consider adding an explicit start() call to the interface. | ||
| if (!started_) { | ||
| started_at_ = time_source_.monotonicTime(); | ||
| started_ = true; | ||
| } | ||
| if (acquireable_count_ > 0) { | ||
| acquireable_count_--; | ||
| acquired_count_++; | ||
| return true; | ||
| } | ||
|
|
||
| const auto elapsed_since_start = time_source_.monotonicTime() - started_at_; | ||
| acquireable_count_ = | ||
| static_cast<int64_t>(std::floor(elapsed_since_start / frequency_.interval())) - | ||
| acquired_count_; | ||
| static_cast<int64_t>(std::floor(elapsed() / frequency_.interval())) - acquired_count_; | ||
| return acquireable_count_ > 0 ? tryAcquireOne() : false; | ||
| } | ||
|
|
||
|
|
@@ -81,38 +78,45 @@ void LinearRateLimiter::releaseOne() { | |
| acquired_count_--; | ||
| } | ||
|
|
||
| DelegatingRateLimiter::DelegatingRateLimiter(Envoy::TimeSource& time_source, | ||
| RateLimiterPtr&& rate_limiter, | ||
| RateLimiterDelegate random_distribution_generator) | ||
| : random_distribution_generator_(std::move(random_distribution_generator)), | ||
| time_source_(time_source), rate_limiter_(std::move(rate_limiter)) {} | ||
| DelegatingRateLimiterImpl::DelegatingRateLimiterImpl( | ||
| RateLimiterPtr&& rate_limiter, RateLimiterDelegate random_distribution_generator) | ||
| : ForwardingRateLimiterImpl(std::move(rate_limiter)), | ||
| random_distribution_generator_(std::move(random_distribution_generator)) {} | ||
|
|
||
| bool DelegatingRateLimiter::tryAcquireOne() { | ||
| bool DelegatingRateLimiterImpl::tryAcquireOne() { | ||
| const auto now = timeSource().monotonicTime(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a slight functional change here? I'm not 100% sure I understand how monotonicTime() works here, but because you're pulling this value before tryAcquireOne here, if tryAcquireOne were to ever have a delay for any reason (even if only in certain impls), what If you don't think this is a legitimate concern, I will completely defer to you on this.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice, catch; I backed this out.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you! |
||
| if (distributed_start_ == absl::nullopt) { | ||
| if (rate_limiter_->tryAcquireOne()) { | ||
| distributed_start_ = time_source_.monotonicTime() + random_distribution_generator_(); | ||
| distributed_start_ = now + random_distribution_generator_(); | ||
| } | ||
| } | ||
|
|
||
| if (distributed_start_ != absl::nullopt && distributed_start_ <= time_source_.monotonicTime()) { | ||
| if (distributed_start_ != absl::nullopt && distributed_start_ <= now) { | ||
| distributed_start_ = absl::nullopt; | ||
| return true; | ||
| } | ||
|
|
||
| return false; | ||
| } | ||
|
|
||
| void DelegatingRateLimiter::releaseOne() { | ||
| void DelegatingRateLimiterImpl::releaseOne() { | ||
| distributed_start_ = absl::nullopt; | ||
| rate_limiter_->releaseOne(); | ||
| } | ||
|
|
||
| DistributionSamplingRateLimiterImpl::DistributionSamplingRateLimiterImpl( | ||
| Envoy::TimeSource& time_source, DiscreteNumericDistributionSamplerPtr&& provider, | ||
| RateLimiterPtr&& rate_limiter) | ||
| : DelegatingRateLimiter( | ||
| time_source, std::move(rate_limiter), | ||
| DiscreteNumericDistributionSamplerPtr&& provider, RateLimiterPtr&& rate_limiter) | ||
| : DelegatingRateLimiterImpl( | ||
| std::move(rate_limiter), | ||
| [this]() { return std::chrono::duration<uint64_t, std::nano>(provider_->getValue()); }), | ||
| provider_(std::move(provider)) {} | ||
|
|
||
| FilteringRateLimiterImpl::FilteringRateLimiterImpl(RateLimiterPtr&& rate_limiter, | ||
| RateLimiterFilter filter) | ||
| : ForwardingRateLimiterImpl(std::move(rate_limiter)), filter_(std::move(filter)) {} | ||
|
|
||
| bool FilteringRateLimiterImpl::tryAcquireOne() { | ||
| return rate_limiter_->tryAcquireOne() ? filter_() : false; | ||
| } | ||
|
|
||
| } // namespace Nighthawk | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,6 +14,60 @@ | |
|
|
||
| namespace Nighthawk { | ||
|
|
||
| /** | ||
| * Rate limiter base class, which implements some shared functionality. | ||
| */ | ||
| class RateLimiterBaseImpl : public RateLimiter { | ||
| public: | ||
| RateLimiterBaseImpl(Envoy::TimeSource& time_source) : time_source_(time_source){}; | ||
| Envoy::TimeSource& timeSource() override { return time_source_; } | ||
| std::chrono::nanoseconds elapsed() override { | ||
| // TODO(oschaaf): consider adding an explicit start() call to the interface. | ||
| const auto now = time_source_.monotonicTime(); | ||
| if (start_time_ == absl::nullopt) { | ||
| start_time_ = now; | ||
| } | ||
| return now - start_time_.value(); | ||
| } | ||
|
|
||
| private: | ||
| Envoy::TimeSource& time_source_; | ||
| absl::optional<Envoy::MonotonicTime> start_time_; | ||
| }; | ||
|
|
||
| /** | ||
| * Simple rate limiter that will allow acquiring at a linear pace. | ||
| * The average rate is computed over a timeframe that starts at | ||
| * the first call to tryAcquireOne(). | ||
| */ | ||
| class LinearRateLimiter : public RateLimiterBaseImpl, | ||
| public Envoy::Logger::Loggable<Envoy::Logger::Id::main> { | ||
| public: | ||
| LinearRateLimiter(Envoy::TimeSource& time_source, const Frequency frequency); | ||
| bool tryAcquireOne() override; | ||
| void releaseOne() override; | ||
|
|
||
| protected: | ||
| int64_t acquireable_count_{0}; | ||
| uint64_t acquired_count_{0}; | ||
| const Frequency frequency_; | ||
| }; | ||
|
|
||
| /** | ||
| * Base for a rate limiter which wraps another rate limiter, and forwards | ||
| * some calls. | ||
| */ | ||
| class ForwardingRateLimiterImpl : public RateLimiter { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If RateLimiterBaseImpl is the base RateLimiter class that we're working with, should this rate limiter also extend from it? Would it make sense to add a comment explaining why it doesn't? If it's because RateLimiterBaseImpl is too specific to be pulled from here, I might suggest that BaseImpl is a counterintuitive name for it.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can see your point, but I have a hard time coming up with a better name; hopefully the comment
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that your added comment clarifies things nicely. Thank you |
||
| public: | ||
| ForwardingRateLimiterImpl(RateLimiterPtr&& rate_limiter) | ||
| : rate_limiter_(std::move(rate_limiter)) {} | ||
| Envoy::TimeSource& timeSource() override { return rate_limiter_->timeSource(); } | ||
| std::chrono::nanoseconds elapsed() override { return rate_limiter_->elapsed(); } | ||
|
|
||
| protected: | ||
| const RateLimiterPtr rate_limiter_; | ||
| }; | ||
|
|
||
| /** | ||
| * BurstingRatelimiter can be wrapped around another rate limiter. It has two modes: | ||
| * 1. First it will be accumulating acquisitions by forwarding calls to the wrapped | ||
|
|
@@ -22,68 +76,50 @@ namespace Nighthawk { | |
| * acquisition calls (returning true and substracting from the accumulated total until | ||
| * nothing is left, after which mode 1 will be entered again). | ||
| */ | ||
| class BurstingRateLimiter : public RateLimiter, | ||
| class BurstingRateLimiter : public ForwardingRateLimiterImpl, | ||
| public Envoy::Logger::Loggable<Envoy::Logger::Id::main> { | ||
| public: | ||
| BurstingRateLimiter(RateLimiterPtr&& rate_limiter, const uint64_t burst_size); | ||
| bool tryAcquireOne() override; | ||
| void releaseOne() override; | ||
|
|
||
| private: | ||
| const RateLimiterPtr rate_limiter_; | ||
| const uint64_t burst_size_; | ||
| uint64_t accumulated_{0}; | ||
| bool releasing_{}; | ||
| absl::optional<bool> previously_releasing_; // Solely used for sanity checking. | ||
| }; | ||
|
|
||
| // Simple rate limiter that will allow acquiring at a linear pace. | ||
| // The average rate is computed over a timeframe that starts at | ||
| // the first call to tryAcquireOne(). | ||
| class LinearRateLimiter : public RateLimiter, | ||
| public Envoy::Logger::Loggable<Envoy::Logger::Id::main> { | ||
| public: | ||
| LinearRateLimiter(Envoy::TimeSource& time_source, const Frequency frequency); | ||
| bool tryAcquireOne() override; | ||
| void releaseOne() override; | ||
|
|
||
| private: | ||
| Envoy::TimeSource& time_source_; | ||
| int64_t acquireable_count_; | ||
| uint64_t acquired_count_; | ||
| const Frequency frequency_; | ||
| bool started_{}; | ||
| Envoy::MonotonicTime started_at_; | ||
| }; | ||
|
|
||
| // We use an unsigned duration here to ensure only future points in time will be yielded. | ||
| // The consuming rate limiter will hold off opening up until the initial point in time plus the | ||
| // offset obtained via the delegate have transpired. | ||
| /** | ||
| * The consuming rate limiter will hold off opening up until the initial point in time plus the | ||
| * offset obtained via the delegate have transpired. | ||
| * We use an unsigned duration here to ensure only future points in time will be yielded. | ||
| */ | ||
| using RateLimiterDelegate = std::function<const std::chrono::duration<uint64_t, std::nano>()>; | ||
|
|
||
| // Wraps a rate limiter, and allows plugging in a delegate which will be queried to offset the | ||
| // timing of the underlying rate limiter. | ||
| class DelegatingRateLimiter : public RateLimiter, | ||
| public Envoy::Logger::Loggable<Envoy::Logger::Id::main> { | ||
| /** | ||
| * Wraps a rate limiter, and allows plugging in a delegate which will be queried to offset the | ||
| * timing of the underlying rate limiter. | ||
| */ | ||
| class DelegatingRateLimiterImpl : public ForwardingRateLimiterImpl, | ||
| public Envoy::Logger::Loggable<Envoy::Logger::Id::main> { | ||
| public: | ||
| DelegatingRateLimiter(Envoy::TimeSource& time_source, RateLimiterPtr&& rate_limiter, | ||
| RateLimiterDelegate random_distribution_generator); | ||
| DelegatingRateLimiterImpl(RateLimiterPtr&& rate_limiter, | ||
| RateLimiterDelegate random_distribution_generator); | ||
| bool tryAcquireOne() override; | ||
| void releaseOne() override; | ||
|
|
||
| protected: | ||
| const RateLimiterDelegate random_distribution_generator_; | ||
|
|
||
| private: | ||
| Envoy::TimeSource& time_source_; | ||
| const RateLimiterPtr rate_limiter_; | ||
| absl::optional<Envoy::MonotonicTime> distributed_start_; | ||
| }; | ||
|
|
||
| class UniformRandomDistributionSamplerImpl : public DiscreteNumericDistributionSampler { | ||
| public: | ||
| UniformRandomDistributionSamplerImpl(const std::chrono::duration<uint64_t, std::nano> upper_bound) | ||
| : distribution_(0, upper_bound.count()) {} | ||
| UniformRandomDistributionSamplerImpl(const uint64_t upper_bound) | ||
| : distribution_(0, upper_bound) {} | ||
| uint64_t getValue() override { return distribution_(generator_); } | ||
|
|
||
| private: | ||
|
|
@@ -92,14 +128,31 @@ class UniformRandomDistributionSamplerImpl : public DiscreteNumericDistributionS | |
| }; | ||
|
|
||
| // Allows adding uniformly distributed random timing offsets to an underlying rate limiter. | ||
| class DistributionSamplingRateLimiterImpl : public DelegatingRateLimiter { | ||
| class DistributionSamplingRateLimiterImpl : public DelegatingRateLimiterImpl { | ||
| public: | ||
| DistributionSamplingRateLimiterImpl(Envoy::TimeSource& time_source, | ||
| DiscreteNumericDistributionSamplerPtr&& provider, | ||
| DistributionSamplingRateLimiterImpl(DiscreteNumericDistributionSamplerPtr&& provider, | ||
| RateLimiterPtr&& rate_limiter); | ||
|
|
||
| private: | ||
| DiscreteNumericDistributionSamplerPtr provider_; | ||
| }; | ||
|
|
||
| /** | ||
| * Callback used to indicate if a rate limiter release should be supressed or not. | ||
| */ | ||
| using RateLimiterFilter = std::function<bool()>; | ||
|
|
||
| // Wraps a rate limiter, and allows plugging in a delegate which will be queried to apply a | ||
| // filter to acquisitions. | ||
| class FilteringRateLimiterImpl : public ForwardingRateLimiterImpl, | ||
| public Envoy::Logger::Loggable<Envoy::Logger::Id::main> { | ||
| public: | ||
| FilteringRateLimiterImpl(RateLimiterPtr&& rate_limiter, RateLimiterFilter filter); | ||
| bool tryAcquireOne() override; | ||
| void releaseOne() override { rate_limiter_->releaseOne(); } | ||
|
|
||
| protected: | ||
| const RateLimiterFilter filter_; | ||
| }; | ||
|
|
||
| } // namespace Nighthawk | ||
Uh oh!
There was an error while loading. Please reload this page.