From 90bd58211bfd0cf0d2c935fb21a231952fd6a979 Mon Sep 17 00:00:00 2001 From: Otto van der Schaaf Date: Sat, 16 Nov 2019 23:54:39 +0100 Subject: [PATCH 01/12] save state on ramping rate limiter Signed-off-by: Otto van der Schaaf --- source/client/factories_impl.cc | 4 ++-- source/common/frequency.h | 4 ++-- source/common/rate_limiter_impl.cc | 18 ++++++++++++++++++ source/common/rate_limiter_impl.h | 15 +++++++++++++-- 4 files changed, 35 insertions(+), 6 deletions(-) diff --git a/source/client/factories_impl.cc b/source/client/factories_impl.cc index 2f8292438..282417cd5 100644 --- a/source/client/factories_impl.cc +++ b/source/client/factories_impl.cc @@ -52,8 +52,8 @@ SequencerPtr SequencerFactoryImpl::create(Envoy::TimeSource& time_source, TerminationPredicate& termination_predicate, Envoy::Stats::Scope& scope) const { StatisticFactoryImpl statistic_factory(options_); - RateLimiterPtr rate_limiter = - std::make_unique(time_source, Frequency(options_.requestsPerSecond())); + RateLimiterPtr rate_limiter = std::make_unique( + time_source, 3s, Frequency(options_.requestsPerSecond())); const uint64_t burst_size = options_.burstSize(); if (burst_size) { diff --git a/source/common/frequency.h b/source/common/frequency.h index 0c1fc652a..f3bb1afcc 100644 --- a/source/common/frequency.h +++ b/source/common/frequency.h @@ -13,8 +13,8 @@ class Frequency { const std::chrono::duration interval() const { return interval_; } private: - const uint64_t hertz_; - const std::chrono::duration interval_; + uint64_t hertz_; + std::chrono::duration interval_; }; constexpr Frequency operator"" _Hz(unsigned long long hz) { return Frequency{hz}; } diff --git a/source/common/rate_limiter_impl.cc b/source/common/rate_limiter_impl.cc index 1934b64bb..ce5b9c621 100644 --- a/source/common/rate_limiter_impl.cc +++ b/source/common/rate_limiter_impl.cc @@ -81,6 +81,24 @@ void LinearRateLimiter::releaseOne() { acquired_count_--; } +RampingLinearRateLimiter::RampingLinearRateLimiter(Envoy::TimeSource& time_source, + const std::chrono::nanoseconds ramp_time, + const Frequency frequency) + : LinearRateLimiter(time_source, 1_Hz), final_frequency_(frequency), ramp_time_(ramp_time) {} + +bool RampingLinearRateLimiter::tryAcquireOne() { + if (started_) { + const auto elapsed_since_start = time_source_.monotonicTime() - started_at_; + double fraction = 1.0; + if (elapsed_since_start < ramp_time_) { + fraction = + 1.0 - ((ramp_time_.count() - elapsed_since_start.count()) / (ramp_time_.count() * 1.0)); + } + frequency_ = Frequency(final_frequency_.value() * fraction); + } + return LinearRateLimiter::tryAcquireOne(); +} + DelegatingRateLimiter::DelegatingRateLimiter(Envoy::TimeSource& time_source, RateLimiterPtr&& rate_limiter, RateLimiterDelegate random_distribution_generator) diff --git a/source/common/rate_limiter_impl.h b/source/common/rate_limiter_impl.h index d40965448..e827a2f1b 100644 --- a/source/common/rate_limiter_impl.h +++ b/source/common/rate_limiter_impl.h @@ -47,15 +47,26 @@ class LinearRateLimiter : public RateLimiter, bool tryAcquireOne() override; void releaseOne() override; -private: +protected: Envoy::TimeSource& time_source_; int64_t acquireable_count_; uint64_t acquired_count_; - const Frequency frequency_; + Frequency frequency_; bool started_{}; Envoy::MonotonicTime started_at_; }; +class RampingLinearRateLimiter : public LinearRateLimiter { +public: + RampingLinearRateLimiter(Envoy::TimeSource& time_source, const std::chrono::nanoseconds ramp_time, + const Frequency frequency); + bool tryAcquireOne() override; + +private: + const Frequency final_frequency_; + const std::chrono::nanoseconds ramp_time_; +}; + // We use an unsigned duration here to ensure only future points in time will be yielded. // The consuming rate limiter will hold off opening up until the initial point in time plus the // offset obtained via the delegate have transpired. From 76158b9c5f882e357d22ca9e65406aec9e2cd7c6 Mon Sep 17 00:00:00 2001 From: Otto van der Schaaf Date: Fri, 29 Nov 2019 12:11:41 +0100 Subject: [PATCH 02/12] Add tests, clean up Signed-off-by: Otto van der Schaaf --- source/client/factories_impl.cc | 6 ++-- source/common/rate_limiter_impl.cc | 26 ++++++++++----- source/common/rate_limiter_impl.h | 3 ++ test/rate_limiter_test.cc | 51 ++++++++++++++++++++++++++++++ 4 files changed, 76 insertions(+), 10 deletions(-) diff --git a/source/client/factories_impl.cc b/source/client/factories_impl.cc index a074c3843..0d7eb16ad 100644 --- a/source/client/factories_impl.cc +++ b/source/client/factories_impl.cc @@ -52,8 +52,10 @@ SequencerPtr SequencerFactoryImpl::create(Envoy::TimeSource& time_source, TerminationPredicate& termination_predicate, Envoy::Stats::Scope& scope) const { StatisticFactoryImpl statistic_factory(options_); - RateLimiterPtr rate_limiter = std::make_unique( - time_source, 3s, Frequency(options_.requestsPerSecond())); + RateLimiterPtr rate_limiter = + std::make_unique(time_source, Frequency(options_.requestsPerSecond())); + // TODO(oschaaf): If we want to take this forward, conditionally construct the ramping rate + // limiter here, e.g.. const uint64_t burst_size = options_.burstSize(); if (burst_size) { diff --git a/source/common/rate_limiter_impl.cc b/source/common/rate_limiter_impl.cc index ce5b9c621..fb7759388 100644 --- a/source/common/rate_limiter_impl.cc +++ b/source/common/rate_limiter_impl.cc @@ -84,19 +84,29 @@ void LinearRateLimiter::releaseOne() { RampingLinearRateLimiter::RampingLinearRateLimiter(Envoy::TimeSource& time_source, const std::chrono::nanoseconds ramp_time, const Frequency frequency) - : LinearRateLimiter(time_source, 1_Hz), final_frequency_(frequency), ramp_time_(ramp_time) {} + : LinearRateLimiter(time_source, frequency), final_frequency_(frequency), + ramp_time_(ramp_time) { + if (ramp_time.count() <= 0) { + throw NighthawkException("ramp_time must be > 0"); + } +} bool RampingLinearRateLimiter::tryAcquireOne() { - if (started_) { + bool return_value = false; + if (!started_ || (frequency_.value() != final_frequency_.value())) { const auto elapsed_since_start = time_source_.monotonicTime() - started_at_; - double fraction = 1.0; - if (elapsed_since_start < ramp_time_) { - fraction = - 1.0 - ((ramp_time_.count() - elapsed_since_start.count()) / (ramp_time_.count() * 1.0)); + const double fraction = + 1.0 - ((ramp_time_.count() - elapsed_since_start.count()) / (ramp_time_.count() * 1.0)); + frequency_ = Frequency(std::round(final_frequency_.value() * fraction)); + // LinearRateLimiter tracks how many ought to have been acquired and will compensate when we + // change the frequency. We're greedy here to disable that corrective behaviour when ramping. + while (LinearRateLimiter::tryAcquireOne()) { + return_value = true; } - frequency_ = Frequency(final_frequency_.value() * fraction); + } else { + return_value = LinearRateLimiter::tryAcquireOne(); } - return LinearRateLimiter::tryAcquireOne(); + return return_value; } DelegatingRateLimiter::DelegatingRateLimiter(Envoy::TimeSource& time_source, diff --git a/source/common/rate_limiter_impl.h b/source/common/rate_limiter_impl.h index e827a2f1b..ddda82e32 100644 --- a/source/common/rate_limiter_impl.h +++ b/source/common/rate_limiter_impl.h @@ -56,6 +56,9 @@ class LinearRateLimiter : public RateLimiter, Envoy::MonotonicTime started_at_; }; +/** + * A rate limiter which linearly ramps up to the desired frequency over the specified period. + */ class RampingLinearRateLimiter : public LinearRateLimiter { public: RampingLinearRateLimiter(Envoy::TimeSource& time_source, const std::chrono::nanoseconds ramp_time, diff --git a/test/rate_limiter_test.cc b/test/rate_limiter_test.cc index dc128e6f0..9fb4637e3 100644 --- a/test/rate_limiter_test.cc +++ b/test/rate_limiter_test.cc @@ -169,4 +169,55 @@ TEST_F(RateLimiterTest, DistributionSamplingRateLimiterImplSchedulingTest) { EXPECT_TRUE(rate_limiter->tryAcquireOne()); } +class RampingLinearRateLimiterTest : public Test { +public: + std::vector getAcquisitionTimings(const Frequency frequency, + const std::chrono::seconds duration) { + Envoy::Event::SimulatedTimeSystem time_system; + // Note: int64_t, because that yields much more helpful output on test failures compared to + // std::duration::milliseconds. + std::vector aquisition_timings; + RampingLinearRateLimiter rate_limiter(time_system, duration, frequency); + auto total_ms_elapsed = 0ms; + auto clock_tick = 1ms; + EXPECT_FALSE(rate_limiter.tryAcquireOne()); + + while (total_ms_elapsed <= duration) { + while (rate_limiter.tryAcquireOne()) { + aquisition_timings.push_back(total_ms_elapsed.count()); + } + time_system.sleep(clock_tick); + total_ms_elapsed += clock_tick; + } + EXPECT_FALSE(rate_limiter.tryAcquireOne()); + + time_system.sleep(1s); + // Verify that after the rampup the expected constant pacing is maintained. + // Calls should be forwarded to the regular linear rate limiter algorithm with its + // corrective behavior so we can expect to acquire a series with that. + for (uint64_t i = 0; i < frequency.value(); i++) { + EXPECT_TRUE(rate_limiter.tryAcquireOne()); + } + // Verify we acquired everything. + EXPECT_FALSE(rate_limiter.tryAcquireOne()); + return aquisition_timings; + } +}; + +TEST_F(RateLimiterTest, RampingLinearRateLimiterInvalidArgumentTest) { + Envoy::Event::SimulatedTimeSystem time_system; + EXPECT_THROW(RampingLinearRateLimiter rate_limiter(time_system, 1s, 0_Hz);, NighthawkException); + EXPECT_THROW(RampingLinearRateLimiter rate_limiter(time_system, 0s, 1_Hz);, NighthawkException); + EXPECT_THROW(RampingLinearRateLimiter rate_limiter(time_system, -1s, 1_Hz);, NighthawkException); +} + +TEST_F(RampingLinearRateLimiterTest, TimingVerificationTest) { + EXPECT_EQ(getAcquisitionTimings(1_Hz, 1s), std::vector({1000})); + EXPECT_EQ(getAcquisitionTimings(3_Hz, 3s), + std::vector({1000, 1500, 2000, 2500, 2667, 3000})); + EXPECT_EQ(getAcquisitionTimings(5_Hz, 5s), + std::vector({1000, 1500, 2000, 2500, 2667, 3000, 3334, 3500, 3750, 4000, 4250, + 4500, 4600, 4800, 5000})); +} + } // namespace Nighthawk From d29c95c6c6ceddae36d9f9b3450d5591774be5e8 Mon Sep 17 00:00:00 2001 From: Otto van der Schaaf Date: Sat, 30 Nov 2019 00:47:29 +0100 Subject: [PATCH 03/12] Refactor + LinearlyOpeningRateLimiterFilter Signed-off-by: Otto van der Schaaf --- include/nighthawk/common/rate_limiter.h | 9 ++ source/common/rate_limiter_impl.cc | 105 ++++++++++++++---------- source/common/rate_limiter_impl.h | 92 +++++++++++++++------ test/mocks.h | 3 + test/rate_limiter_test.cc | 80 +++++++++++++----- 5 files changed, 202 insertions(+), 87 deletions(-) diff --git a/include/nighthawk/common/rate_limiter.h b/include/nighthawk/common/rate_limiter.h index 3a8562b92..e7049a980 100644 --- a/include/nighthawk/common/rate_limiter.h +++ b/include/nighthawk/common/rate_limiter.h @@ -4,6 +4,8 @@ #include "envoy/common/pure.h" +#include "absl/types/optional.h" + namespace Nighthawk { /** @@ -24,6 +26,13 @@ class RateLimiter { * Releases a controlled resource. */ virtual void releaseOne() PURE; + + /** + * @return Envoy::TimeSource& time_source + */ + virtual Envoy::TimeSource& timeSource() PURE; + virtual absl::optional timeStarted() const PURE; + virtual std::chrono::nanoseconds elapsed() const PURE; }; using RateLimiterPtr = std::unique_ptr; diff --git a/source/common/rate_limiter_impl.cc b/source/common/rate_limiter_impl.cc index fb7759388..509f67fca 100644 --- a/source/common/rate_limiter_impl.cc +++ b/source/common/rate_limiter_impl.cc @@ -6,8 +6,10 @@ namespace Nighthawk { +using namespace std::chrono_literals; + BurstingRateLimiter::BurstingRateLimiter(RateLimiterPtr&& rate_limiter, const uint64_t burst_size) - : rate_limiter_(std::move(rate_limiter)), burst_size_(burst_size) { + : ForwardingRateLimiterImpl(std::move(rate_limiter)), burst_size_(burst_size) { ASSERT(burst_size_ > 0); } @@ -59,9 +61,8 @@ LinearRateLimiter::LinearRateLimiter(Envoy::TimeSource& time_source, const Frequ bool LinearRateLimiter::tryAcquireOne() { // TODO(oschaaf): consider adding an explicit start() call to the interface. - if (!started_) { - started_at_ = time_source_.monotonicTime(); - started_ = true; + if (start_time_ == absl::nullopt) { + start_time_ = time_source_.monotonicTime(); } if (acquireable_count_ > 0) { acquireable_count_--; @@ -69,11 +70,11 @@ bool LinearRateLimiter::tryAcquireOne() { return true; } - const auto elapsed_since_start = time_source_.monotonicTime() - started_at_; acquireable_count_ = - static_cast(std::floor(elapsed_since_start / frequency_.interval())) - - acquired_count_; - return acquireable_count_ > 0 ? tryAcquireOne() : false; + frequency_.value() == 0 + ? 0 + : static_cast(std::ceil(elapsed() / frequency_.interval())) - acquired_count_; + return acquireable_count_ > 0 ? LinearRateLimiter::tryAcquireOne() : false; } void LinearRateLimiter::releaseOne() { @@ -81,7 +82,40 @@ void LinearRateLimiter::releaseOne() { acquired_count_--; } -RampingLinearRateLimiter::RampingLinearRateLimiter(Envoy::TimeSource& time_source, +DelegatingRateLimiter::DelegatingRateLimiter(RateLimiterPtr&& rate_limiter, + RateLimiterDelegate random_distribution_generator) + : ForwardingRateLimiterImpl(std::move(rate_limiter)), + random_distribution_generator_(std::move(random_distribution_generator)) {} + +bool DelegatingRateLimiter::tryAcquireOne() { + const auto now = timeSource().monotonicTime(); + if (distributed_start_ == absl::nullopt) { + if (rate_limiter_->tryAcquireOne()) { + distributed_start_ = now + random_distribution_generator_(); + } + } + + if (distributed_start_ != absl::nullopt && distributed_start_ <= now) { + distributed_start_ = absl::nullopt; + return true; + } + + return false; +} + +void DelegatingRateLimiter::releaseOne() { + distributed_start_ = absl::nullopt; + rate_limiter_->releaseOne(); +} + +FilteringRateLimiter::FilteringRateLimiter(RateLimiterPtr&& rate_limiter, RateLimiterFilter filter) + : ForwardingRateLimiterImpl(std::move(rate_limiter)), filter_(std::move(filter)) {} + +bool FilteringRateLimiter::tryAcquireOne() { + return rate_limiter_->tryAcquireOne() ? filter_() : false; +} + +LinearRampingRateLimiter::LinearRampingRateLimiter(Envoy::TimeSource& time_source, const std::chrono::nanoseconds ramp_time, const Frequency frequency) : LinearRateLimiter(time_source, frequency), final_frequency_(frequency), @@ -91,13 +125,12 @@ RampingLinearRateLimiter::RampingLinearRateLimiter(Envoy::TimeSource& time_sourc } } -bool RampingLinearRateLimiter::tryAcquireOne() { +bool LinearRampingRateLimiter::tryAcquireOne() { bool return_value = false; - if (!started_ || (frequency_.value() != final_frequency_.value())) { - const auto elapsed_since_start = time_source_.monotonicTime() - started_at_; + if (timeStarted() == absl::nullopt || (frequency_.value() != final_frequency_.value())) { const double fraction = - 1.0 - ((ramp_time_.count() - elapsed_since_start.count()) / (ramp_time_.count() * 1.0)); - frequency_ = Frequency(std::round(final_frequency_.value() * fraction)); + 1.0 - ((ramp_time_.count() - elapsed().count()) / (ramp_time_.count() * 1.0)); + frequency_ = Frequency(std::ceil(final_frequency_.value() * fraction)); // LinearRateLimiter tracks how many ought to have been acquired and will compensate when we // change the frequency. We're greedy here to disable that corrective behaviour when ramping. while (LinearRateLimiter::tryAcquireOne()) { @@ -109,37 +142,25 @@ bool RampingLinearRateLimiter::tryAcquireOne() { return return_value; } -DelegatingRateLimiter::DelegatingRateLimiter(Envoy::TimeSource& time_source, - RateLimiterPtr&& rate_limiter, - RateLimiterDelegate random_distribution_generator) - : random_distribution_generator_(std::move(random_distribution_generator)), - time_source_(time_source), rate_limiter_(std::move(rate_limiter)) {} - -bool DelegatingRateLimiter::tryAcquireOne() { - if (distributed_start_ == absl::nullopt) { - if (rate_limiter_->tryAcquireOne()) { - distributed_start_ = time_source_.monotonicTime() + random_distribution_generator_(); - } - } - - if (distributed_start_ != absl::nullopt && distributed_start_ <= time_source_.monotonicTime()) { - distributed_start_ = absl::nullopt; - return true; - } - - return false; -} - -void DelegatingRateLimiter::releaseOne() { - distributed_start_ = absl::nullopt; - rate_limiter_->releaseOne(); -} +LinearlyOpeningRateLimiterFilter::LinearlyOpeningRateLimiterFilter( + const std::chrono::nanoseconds ramp_time, RateLimiterPtr&& rate_limiter) + : FilteringRateLimiter(std::move(rate_limiter), + [this]() { + if (elapsed() < ramp_time_) { + const double chance_percentage = + 1.0 - ((ramp_time_.count() - elapsed().count()) / + (ramp_time_.count() * 1.0)) * + 100; + return chance_percentage >= sampler_.getValue(); + } + return true; + }), + sampler_(100ns), ramp_time_(ramp_time) {} DistributionSamplingRateLimiterImpl::DistributionSamplingRateLimiterImpl( - Envoy::TimeSource& time_source, DiscreteNumericDistributionSamplerPtr&& provider, - RateLimiterPtr&& rate_limiter) + DiscreteNumericDistributionSamplerPtr&& provider, RateLimiterPtr&& rate_limiter) : DelegatingRateLimiter( - time_source, std::move(rate_limiter), + std::move(rate_limiter), [this]() { return std::chrono::duration(provider_->getValue()); }), provider_(std::move(provider)) {} diff --git a/source/common/rate_limiter_impl.h b/source/common/rate_limiter_impl.h index ddda82e32..3f496af56 100644 --- a/source/common/rate_limiter_impl.h +++ b/source/common/rate_limiter_impl.h @@ -14,6 +14,20 @@ namespace Nighthawk { +class ForwardingRateLimiterImpl : public RateLimiter { +public: + ForwardingRateLimiterImpl(RateLimiterPtr&& rate_limiter) + : rate_limiter_(std::move(rate_limiter)) {} + Envoy::TimeSource& timeSource() override { return rate_limiter_->timeSource(); } + absl::optional timeStarted() const override { + return rate_limiter_->timeStarted(); + } + std::chrono::nanoseconds elapsed() const override { return rate_limiter_->elapsed(); } + +protected: + const RateLimiterPtr rate_limiter_; +}; + /** * BurstingRatelimiter can be wrapped around another rate limiter. It has two modes: * 1. First it will be accumulating acquisitions by forwarding calls to the wrapped @@ -22,7 +36,7 @@ namespace Nighthawk { * acquisition calls (returning true and substracting from the accumulated total until * nothing is left, after which mode 1 will be entered again). */ -class BurstingRateLimiter : public RateLimiter, +class BurstingRateLimiter : public ForwardingRateLimiterImpl, public Envoy::Logger::Loggable { public: BurstingRateLimiter(RateLimiterPtr&& rate_limiter, const uint64_t burst_size); @@ -30,7 +44,6 @@ class BurstingRateLimiter : public RateLimiter, void releaseOne() override; private: - const RateLimiterPtr rate_limiter_; const uint64_t burst_size_; uint64_t accumulated_{0}; bool releasing_{}; @@ -46,28 +59,19 @@ class LinearRateLimiter : public RateLimiter, LinearRateLimiter(Envoy::TimeSource& time_source, const Frequency frequency); bool tryAcquireOne() override; void releaseOne() override; + Envoy::TimeSource& timeSource() override { return time_source_; } + absl::optional timeStarted() const override { return start_time_; } + std::chrono::nanoseconds elapsed() const override { + const auto now = time_source_.monotonicTime(); + return now - start_time_.value_or(now); + } protected: Envoy::TimeSource& time_source_; - int64_t acquireable_count_; - uint64_t acquired_count_; + int64_t acquireable_count_{0}; + uint64_t acquired_count_{0}; Frequency frequency_; - bool started_{}; - Envoy::MonotonicTime started_at_; -}; - -/** - * A rate limiter which linearly ramps up to the desired frequency over the specified period. - */ -class RampingLinearRateLimiter : public LinearRateLimiter { -public: - RampingLinearRateLimiter(Envoy::TimeSource& time_source, const std::chrono::nanoseconds ramp_time, - const Frequency frequency); - bool tryAcquireOne() override; - -private: - const Frequency final_frequency_; - const std::chrono::nanoseconds ramp_time_; + absl::optional start_time_; }; // We use an unsigned duration here to ensure only future points in time will be yielded. @@ -75,12 +79,14 @@ class RampingLinearRateLimiter : public LinearRateLimiter { // offset obtained via the delegate have transpired. using RateLimiterDelegate = std::function()>; +using RateLimiterFilter = std::function; + // Wraps a rate limiter, and allows plugging in a delegate which will be queried to offset the // timing of the underlying rate limiter. -class DelegatingRateLimiter : public RateLimiter, +class DelegatingRateLimiter : public ForwardingRateLimiterImpl, public Envoy::Logger::Loggable { public: - DelegatingRateLimiter(Envoy::TimeSource& time_source, RateLimiterPtr&& rate_limiter, + DelegatingRateLimiter(RateLimiterPtr&& rate_limiter, RateLimiterDelegate random_distribution_generator); bool tryAcquireOne() override; void releaseOne() override; @@ -89,11 +95,23 @@ class DelegatingRateLimiter : public RateLimiter, const RateLimiterDelegate random_distribution_generator_; private: - Envoy::TimeSource& time_source_; - const RateLimiterPtr rate_limiter_; absl::optional distributed_start_; }; +/** + * A rate limiter which linearly ramps up to the desired frequency over the specified period. + */ +class LinearRampingRateLimiter : public LinearRateLimiter { +public: + LinearRampingRateLimiter(Envoy::TimeSource& time_source, const std::chrono::nanoseconds ramp_time, + const Frequency frequency); + bool tryAcquireOne() override; + +private: + const Frequency final_frequency_; + const std::chrono::nanoseconds ramp_time_; +}; + class UniformRandomDistributionSamplerImpl : public DiscreteNumericDistributionSampler { public: UniformRandomDistributionSamplerImpl(const std::chrono::duration upper_bound) @@ -108,12 +126,34 @@ class UniformRandomDistributionSamplerImpl : public DiscreteNumericDistributionS // Allows adding uniformly distributed random timing offsets to an underlying rate limiter. class DistributionSamplingRateLimiterImpl : public DelegatingRateLimiter { public: - DistributionSamplingRateLimiterImpl(Envoy::TimeSource& time_source, - DiscreteNumericDistributionSamplerPtr&& provider, + DistributionSamplingRateLimiterImpl(DiscreteNumericDistributionSamplerPtr&& provider, RateLimiterPtr&& rate_limiter); private: DiscreteNumericDistributionSamplerPtr provider_; }; +// Wraps a rate limiter, and allows plugging in a delegate which will be queried to apply a +// filter to acquisitions. +class FilteringRateLimiter : public ForwardingRateLimiterImpl, + public Envoy::Logger::Loggable { +public: + FilteringRateLimiter(RateLimiterPtr&& rate_limiter, RateLimiterFilter filter); + bool tryAcquireOne() override; + void releaseOne() override { rate_limiter_->releaseOne(); } + +protected: + const RateLimiterFilter filter_; +}; + +class LinearlyOpeningRateLimiterFilter : public FilteringRateLimiter { +public: + LinearlyOpeningRateLimiterFilter(const std::chrono::nanoseconds ramp_time, + RateLimiterPtr&& rate_limiter); + +private: + UniformRandomDistributionSamplerImpl sampler_; + const std::chrono::nanoseconds ramp_time_; +}; + } // namespace Nighthawk \ No newline at end of file diff --git a/test/mocks.h b/test/mocks.h index 3011b814a..aff2ece42 100644 --- a/test/mocks.h +++ b/test/mocks.h @@ -48,6 +48,9 @@ class MockRateLimiter : public RateLimiter { MOCK_METHOD0(tryAcquireOne, bool()); MOCK_METHOD0(releaseOne, void()); + MOCK_METHOD0(timeSource, Envoy::TimeSource&()); + MOCK_CONST_METHOD0(timeStarted, absl::optional()); + MOCK_CONST_METHOD0(elapsed, std::chrono::nanoseconds()); }; class MockSequencer : public Sequencer { diff --git a/test/rate_limiter_test.cc b/test/rate_limiter_test.cc index 9fb4637e3..a2fd135fd 100644 --- a/test/rate_limiter_test.cc +++ b/test/rate_limiter_test.cc @@ -93,12 +93,13 @@ class BurstingRateLimiterIntegrationTest : public Test { }; TEST_F(BurstingRateLimiterIntegrationTest, BurstingLinearRateLimiterTest) { - testBurstSize(1, 100_Hz); + // TODO(oschaaf): + // testBurstSize(1, 100_Hz); testBurstSize(2, 100_Hz); testBurstSize(13, 100_Hz); testBurstSize(100, 100_Hz); - testBurstSize(1, 50_Hz); + // testBurstSize(1, 50_Hz); testBurstSize(2, 50_Hz); testBurstSize(13, 50_Hz); testBurstSize(100, 50_Hz); @@ -109,9 +110,11 @@ TEST_F(RateLimiterTest, DistributionSamplingRateLimiterImplTest) { auto mock_rate_limiter = std::make_unique(); MockRateLimiter& unsafe_mock_rate_limiter = *mock_rate_limiter; Envoy::Event::SimulatedTimeSystem time_system; + EXPECT_CALL(unsafe_mock_rate_limiter, timeSource) + .Times(AtLeast(1)) + .WillRepeatedly(ReturnRef(time_system)); RateLimiterPtr rate_limiter = std::make_unique( - time_system, std::make_unique(1ns), - std::move(mock_rate_limiter)); + std::make_unique(1ns), std::move(mock_rate_limiter)); EXPECT_CALL(unsafe_mock_rate_limiter, tryAcquireOne).Times(tries).WillRepeatedly(Return(true)); EXPECT_CALL(unsafe_mock_rate_limiter, releaseOne).Times(tries); @@ -139,10 +142,12 @@ TEST_F(RateLimiterTest, DistributionSamplingRateLimiterImplSchedulingTest) { Envoy::Event::SimulatedTimeSystem time_system; auto* unsafe_discrete_numeric_distribution_sampler = new MockDiscreteNumericDistributionSampler(); RateLimiterPtr rate_limiter = std::make_unique( - time_system, std::unique_ptr( unsafe_discrete_numeric_distribution_sampler), std::move(mock_rate_limiter)); + EXPECT_CALL(unsafe_mock_rate_limiter, timeSource) + .Times(AtLeast(1)) + .WillRepeatedly(ReturnRef(time_system)); EXPECT_CALL(unsafe_mock_rate_limiter, tryAcquireOne) .Times(AtLeast(1)) @@ -169,7 +174,7 @@ TEST_F(RateLimiterTest, DistributionSamplingRateLimiterImplSchedulingTest) { EXPECT_TRUE(rate_limiter->tryAcquireOne()); } -class RampingLinearRateLimiterTest : public Test { +class LinearRampingRateLimiterTest : public Test { public: std::vector getAcquisitionTimings(const Frequency frequency, const std::chrono::seconds duration) { @@ -177,12 +182,12 @@ class RampingLinearRateLimiterTest : public Test { // Note: int64_t, because that yields much more helpful output on test failures compared to // std::duration::milliseconds. std::vector aquisition_timings; - RampingLinearRateLimiter rate_limiter(time_system, duration, frequency); + LinearRampingRateLimiter rate_limiter(time_system, duration, frequency); auto total_ms_elapsed = 0ms; auto clock_tick = 1ms; EXPECT_FALSE(rate_limiter.tryAcquireOne()); - while (total_ms_elapsed <= duration) { + while (total_ms_elapsed < duration) { while (rate_limiter.tryAcquireOne()) { aquisition_timings.push_back(total_ms_elapsed.count()); } @@ -190,7 +195,6 @@ class RampingLinearRateLimiterTest : public Test { total_ms_elapsed += clock_tick; } EXPECT_FALSE(rate_limiter.tryAcquireOne()); - time_system.sleep(1s); // Verify that after the rampup the expected constant pacing is maintained. // Calls should be forwarded to the regular linear rate limiter algorithm with its @@ -204,20 +208,58 @@ class RampingLinearRateLimiterTest : public Test { } }; -TEST_F(RateLimiterTest, RampingLinearRateLimiterInvalidArgumentTest) { +TEST_F(RateLimiterTest, LinearRampingRateLimiterInvalidArgumentTest) { Envoy::Event::SimulatedTimeSystem time_system; - EXPECT_THROW(RampingLinearRateLimiter rate_limiter(time_system, 1s, 0_Hz);, NighthawkException); - EXPECT_THROW(RampingLinearRateLimiter rate_limiter(time_system, 0s, 1_Hz);, NighthawkException); - EXPECT_THROW(RampingLinearRateLimiter rate_limiter(time_system, -1s, 1_Hz);, NighthawkException); + EXPECT_THROW(LinearRampingRateLimiter rate_limiter(time_system, 1s, 0_Hz);, NighthawkException); + EXPECT_THROW(LinearRampingRateLimiter rate_limiter(time_system, 0s, 1_Hz);, NighthawkException); + EXPECT_THROW(LinearRampingRateLimiter rate_limiter(time_system, -1s, 1_Hz);, NighthawkException); } -TEST_F(RampingLinearRateLimiterTest, TimingVerificationTest) { - EXPECT_EQ(getAcquisitionTimings(1_Hz, 1s), std::vector({1000})); - EXPECT_EQ(getAcquisitionTimings(3_Hz, 3s), - std::vector({1000, 1500, 2000, 2500, 2667, 3000})); +TEST_F(LinearRampingRateLimiterTest, TimingVerificationTest) { EXPECT_EQ(getAcquisitionTimings(5_Hz, 5s), - std::vector({1000, 1500, 2000, 2500, 2667, 3000, 3334, 3500, 3750, 4000, 4250, - 4500, 4600, 4800, 5000})); + std::vector({1, 1001, 1501, 2001, 2334, 2667, 3001, 3251, 3501, 3751, 4001, + 4201, 4401, 4601, 4801})); +} + +class LinearlyOpeningRateLimiterFilterTest : public Test { +public: + std::vector getAcquisitionTimings(const Frequency frequency, + const std::chrono::seconds duration) { + Envoy::Event::SimulatedTimeSystem time_system; + // Note: int64_t, because that yields much more helpful output on test failures compared to + // std::duration::milliseconds. + std::vector aquisition_timings; + RateLimiterPtr rate_limiter = std::make_unique( + duration, std::make_unique(time_system, frequency)); + auto total_ms_elapsed = 0ms; + auto clock_tick = 1ms; + EXPECT_FALSE(rate_limiter->tryAcquireOne()); + + while (total_ms_elapsed <= duration) { + if (rate_limiter->tryAcquireOne()) { + aquisition_timings.push_back(total_ms_elapsed.count()); + EXPECT_FALSE(rate_limiter->tryAcquireOne()); + } + time_system.sleep(clock_tick); + total_ms_elapsed += clock_tick; + } + EXPECT_FALSE(rate_limiter->tryAcquireOne()); + time_system.sleep(1s); + // Verify that after the rampup the expected constant pacing is maintained. + // Calls should be forwarded to the regular linear rate limiter algorithm with its + // corrective behavior so we can expect to acquire a series with that. + for (uint64_t i = 0; i < frequency.value(); i++) { + EXPECT_TRUE(rate_limiter->tryAcquireOne()); + } + // Verify we acquired everything. + EXPECT_FALSE(rate_limiter->tryAcquireOne()); + return aquisition_timings; + } +}; + +// TODO(oschaaf): +TEST_F(LinearlyOpeningRateLimiterFilterTest, DISABLED_TimingVerificationTest) { + EXPECT_EQ(getAcquisitionTimings(10_Hz, 1s), std::vector({1, 501, 601, 701, 801})); } } // namespace Nighthawk From ebd857ce47d0ca69d6c22e82482045b39cfc49e2 Mon Sep 17 00:00:00 2001 From: Otto van der Schaaf Date: Sat, 30 Nov 2019 20:12:13 +0100 Subject: [PATCH 04/12] save state Signed-off-by: Otto van der Schaaf --- include/nighthawk/common/rate_limiter.h | 2 + source/common/rate_limiter_impl.cc | 36 ++++++------ source/common/rate_limiter_impl.h | 29 ++++++---- test/mocks.h | 2 + test/rate_limiter_test.cc | 76 +++++++++++++++---------- 5 files changed, 87 insertions(+), 58 deletions(-) diff --git a/include/nighthawk/common/rate_limiter.h b/include/nighthawk/common/rate_limiter.h index e7049a980..79c1d9191 100644 --- a/include/nighthawk/common/rate_limiter.h +++ b/include/nighthawk/common/rate_limiter.h @@ -43,6 +43,8 @@ using RateLimiterPtr = std::unique_ptr; class DiscreteNumericDistributionSampler { public: virtual ~DiscreteNumericDistributionSampler() = default; + virtual uint64_t min() PURE; + virtual uint64_t max() PURE; virtual uint64_t getValue() PURE; }; diff --git a/source/common/rate_limiter_impl.cc b/source/common/rate_limiter_impl.cc index 509f67fca..4abfb729c 100644 --- a/source/common/rate_limiter_impl.cc +++ b/source/common/rate_limiter_impl.cc @@ -73,7 +73,7 @@ bool LinearRateLimiter::tryAcquireOne() { acquireable_count_ = frequency_.value() == 0 ? 0 - : static_cast(std::ceil(elapsed() / frequency_.interval())) - acquired_count_; + : static_cast(std::floor(elapsed() / frequency_.interval())) - acquired_count_; return acquireable_count_ > 0 ? LinearRateLimiter::tryAcquireOne() : false; } @@ -130,7 +130,7 @@ bool LinearRampingRateLimiter::tryAcquireOne() { if (timeStarted() == absl::nullopt || (frequency_.value() != final_frequency_.value())) { const double fraction = 1.0 - ((ramp_time_.count() - elapsed().count()) / (ramp_time_.count() * 1.0)); - frequency_ = Frequency(std::ceil(final_frequency_.value() * fraction)); + frequency_ = Frequency(std::round(final_frequency_.value() * fraction)); // LinearRateLimiter tracks how many ought to have been acquired and will compensate when we // change the frequency. We're greedy here to disable that corrective behaviour when ramping. while (LinearRateLimiter::tryAcquireOne()) { @@ -142,20 +142,24 @@ bool LinearRampingRateLimiter::tryAcquireOne() { return return_value; } -LinearlyOpeningRateLimiterFilter::LinearlyOpeningRateLimiterFilter( - const std::chrono::nanoseconds ramp_time, RateLimiterPtr&& rate_limiter) - : FilteringRateLimiter(std::move(rate_limiter), - [this]() { - if (elapsed() < ramp_time_) { - const double chance_percentage = - 1.0 - ((ramp_time_.count() - elapsed().count()) / - (ramp_time_.count() * 1.0)) * - 100; - return chance_percentage >= sampler_.getValue(); - } - return true; - }), - sampler_(100ns), ramp_time_(ramp_time) {} +GraduallyOpeningRateLimiterFilter::GraduallyOpeningRateLimiterFilter( + const std::chrono::nanoseconds ramp_time, DiscreteNumericDistributionSamplerPtr&& provider, + RateLimiterPtr&& rate_limiter) + : FilteringRateLimiter( + std::move(rate_limiter), + [this]() { + if (elapsed() < ramp_time_) { + const double chance_percentage = + 100.0 - + ((ramp_time_.count() - elapsed().count()) / (ramp_time_.count() * 1.0)) * 100.0; + return std::round(provider_->getValue() / 10000.0) <= chance_percentage; + } + return true; + }), + provider_(std::move(provider)), ramp_time_(ramp_time) { + RELEASE_ASSERT(provider_->min() == 1 && provider_->max() == 1000000, + "expected a distribution ranging from 1-1000000"); +} DistributionSamplingRateLimiterImpl::DistributionSamplingRateLimiterImpl( DiscreteNumericDistributionSamplerPtr&& provider, RateLimiterPtr&& rate_limiter) diff --git a/source/common/rate_limiter_impl.h b/source/common/rate_limiter_impl.h index 3f496af56..e75a8eade 100644 --- a/source/common/rate_limiter_impl.h +++ b/source/common/rate_limiter_impl.h @@ -50,9 +50,11 @@ class BurstingRateLimiter : public ForwardingRateLimiterImpl, absl::optional previously_releasing_; // Solely used for sanity checking. }; -// Simple rate limiter that will allow acquiring at a linear pace. -// The average rate is computed over a timeframe that starts at -// the first call to tryAcquireOne(). +/** + * Simple rate limiter that will allow acquiring at a linear pace. + * The average rate is computed over a timeframe that starts at + * the first call to tryAcquireOne(). + */ class LinearRateLimiter : public RateLimiter, public Envoy::Logger::Loggable { public: @@ -81,8 +83,10 @@ using RateLimiterDelegate = std::function; -// Wraps a rate limiter, and allows plugging in a delegate which will be queried to offset the -// timing of the underlying rate limiter. +/** + * Wraps a rate limiter, and allows plugging in a delegate which will be queried to offset the + * timing of the underlying rate limiter. + */ class DelegatingRateLimiter : public ForwardingRateLimiterImpl, public Envoy::Logger::Loggable { public: @@ -114,9 +118,11 @@ class LinearRampingRateLimiter : public LinearRateLimiter { class UniformRandomDistributionSamplerImpl : public DiscreteNumericDistributionSampler { public: - UniformRandomDistributionSamplerImpl(const std::chrono::duration upper_bound) - : distribution_(0, upper_bound.count()) {} + UniformRandomDistributionSamplerImpl(const uint64_t upper_bound) + : distribution_(0, upper_bound) {} uint64_t getValue() override { return distribution_(generator_); } + uint64_t min() override { return distribution_.min(); } + uint64_t max() override { return distribution_.max(); } private: std::default_random_engine generator_; @@ -146,13 +152,14 @@ class FilteringRateLimiter : public ForwardingRateLimiterImpl, const RateLimiterFilter filter_; }; -class LinearlyOpeningRateLimiterFilter : public FilteringRateLimiter { +class GraduallyOpeningRateLimiterFilter : public FilteringRateLimiter { public: - LinearlyOpeningRateLimiterFilter(const std::chrono::nanoseconds ramp_time, - RateLimiterPtr&& rate_limiter); + GraduallyOpeningRateLimiterFilter(const std::chrono::nanoseconds ramp_time, + DiscreteNumericDistributionSamplerPtr&& provider, + RateLimiterPtr&& rate_limiter); private: - UniformRandomDistributionSamplerImpl sampler_; + DiscreteNumericDistributionSamplerPtr provider_; const std::chrono::nanoseconds ramp_time_; }; diff --git a/test/mocks.h b/test/mocks.h index aff2ece42..114c34951 100644 --- a/test/mocks.h +++ b/test/mocks.h @@ -189,6 +189,8 @@ class MockDiscreteNumericDistributionSampler : public DiscreteNumericDistributio public: MockDiscreteNumericDistributionSampler(); MOCK_METHOD0(getValue, uint64_t()); + MOCK_METHOD0(min, uint64_t()); + MOCK_METHOD0(max, uint64_t()); }; } // namespace Nighthawk diff --git a/test/rate_limiter_test.cc b/test/rate_limiter_test.cc index a2fd135fd..273b96803 100644 --- a/test/rate_limiter_test.cc +++ b/test/rate_limiter_test.cc @@ -77,29 +77,27 @@ class BurstingRateLimiterIntegrationTest : public Test { const auto burst_interval_ms = std::chrono::duration_cast(frequency.interval() * burst_size); - EXPECT_FALSE(rate_limiter->tryAcquireOne()); - time_system.sleep(burst_interval_ms); - for (uint64_t i = 0; i < burst_size; i++) { - EXPECT_TRUE(rate_limiter->tryAcquireOne()); - } - EXPECT_FALSE(rate_limiter->tryAcquireOne()); - time_system.sleep(burst_interval_ms / 2); - EXPECT_FALSE(rate_limiter->tryAcquireOne()); - time_system.sleep(burst_interval_ms); - for (uint64_t i = 0; i < burst_size; i++) { - EXPECT_TRUE(rate_limiter->tryAcquireOne()); + for (uint64_t i = 0; i < 10000; i++) { + uint64_t burst_acquired = 0; + while (rate_limiter->tryAcquireOne()) { + burst_acquired++; + } + if (burst_acquired) { + EXPECT_EQ(burst_acquired, burst_size); + EXPECT_EQ(i % burst_interval_ms.count(), 0); + } + time_system.sleep(1ms); } } }; TEST_F(BurstingRateLimiterIntegrationTest, BurstingLinearRateLimiterTest) { - // TODO(oschaaf): - // testBurstSize(1, 100_Hz); + testBurstSize(1, 100_Hz); testBurstSize(2, 100_Hz); testBurstSize(13, 100_Hz); testBurstSize(100, 100_Hz); - // testBurstSize(1, 50_Hz); + testBurstSize(1, 50_Hz); testBurstSize(2, 50_Hz); testBurstSize(13, 50_Hz); testBurstSize(100, 50_Hz); @@ -114,7 +112,7 @@ TEST_F(RateLimiterTest, DistributionSamplingRateLimiterImplTest) { .Times(AtLeast(1)) .WillRepeatedly(ReturnRef(time_system)); RateLimiterPtr rate_limiter = std::make_unique( - std::make_unique(1ns), std::move(mock_rate_limiter)); + std::make_unique(1), std::move(mock_rate_limiter)); EXPECT_CALL(unsafe_mock_rate_limiter, tryAcquireOne).Times(tries).WillRepeatedly(Return(true)); EXPECT_CALL(unsafe_mock_rate_limiter, releaseOne).Times(tries); @@ -179,21 +177,19 @@ class LinearRampingRateLimiterTest : public Test { std::vector getAcquisitionTimings(const Frequency frequency, const std::chrono::seconds duration) { Envoy::Event::SimulatedTimeSystem time_system; - // Note: int64_t, because that yields much more helpful output on test failures compared to - // std::duration::milliseconds. std::vector aquisition_timings; LinearRampingRateLimiter rate_limiter(time_system, duration, frequency); auto total_ms_elapsed = 0ms; auto clock_tick = 1ms; EXPECT_FALSE(rate_limiter.tryAcquireOne()); - while (total_ms_elapsed < duration) { + do { while (rate_limiter.tryAcquireOne()) { aquisition_timings.push_back(total_ms_elapsed.count()); } time_system.sleep(clock_tick); total_ms_elapsed += clock_tick; - } + } while (total_ms_elapsed <= duration); EXPECT_FALSE(rate_limiter.tryAcquireOne()); time_system.sleep(1s); // Verify that after the rampup the expected constant pacing is maintained. @@ -217,32 +213,48 @@ TEST_F(RateLimiterTest, LinearRampingRateLimiterInvalidArgumentTest) { TEST_F(LinearRampingRateLimiterTest, TimingVerificationTest) { EXPECT_EQ(getAcquisitionTimings(5_Hz, 5s), - std::vector({1, 1001, 1501, 2001, 2334, 2667, 3001, 3251, 3501, 3751, 4001, - 4201, 4401, 4601, 4801})); + std::vector({1000, 1500, 2000, 2500, 2667, 3000, 3334, 3500, 3750, 4000, 4250, + 4500, 4600, 4800, 5000})); } -class LinearlyOpeningRateLimiterFilterTest : public Test { +class GraduallyOpeningRateLimiterFilterTest : public Test { public: std::vector getAcquisitionTimings(const Frequency frequency, const std::chrono::seconds duration) { Envoy::Event::SimulatedTimeSystem time_system; - // Note: int64_t, because that yields much more helpful output on test failures compared to - // std::duration::milliseconds. std::vector aquisition_timings; - RateLimiterPtr rate_limiter = std::make_unique( - duration, std::make_unique(time_system, frequency)); + auto* unsafe_discrete_numeric_distribution_sampler = + new MockDiscreteNumericDistributionSampler(); + std::mt19937_64 mt(1243); + std::uniform_int_distribution dist(1, 1000000); + EXPECT_CALL(*unsafe_discrete_numeric_distribution_sampler, getValue) + .Times(AtLeast(1)) + .WillRepeatedly(Invoke([&dist, &mt]() { return dist(mt); })); + EXPECT_CALL(*unsafe_discrete_numeric_distribution_sampler, min) + .Times(1) + .WillOnce(Return(dist.min())); + EXPECT_CALL(*unsafe_discrete_numeric_distribution_sampler, max) + .Times(1) + .WillOnce(Return(dist.max())); + + RateLimiterPtr rate_limiter = std::make_unique( + duration, + std::unique_ptr( + unsafe_discrete_numeric_distribution_sampler), + std::make_unique(time_system, frequency)); auto total_ms_elapsed = 0ms; auto clock_tick = 1ms; EXPECT_FALSE(rate_limiter->tryAcquireOne()); - while (total_ms_elapsed <= duration) { + do { if (rate_limiter->tryAcquireOne()) { aquisition_timings.push_back(total_ms_elapsed.count()); EXPECT_FALSE(rate_limiter->tryAcquireOne()); } time_system.sleep(clock_tick); total_ms_elapsed += clock_tick; - } + } while (total_ms_elapsed <= duration); + EXPECT_FALSE(rate_limiter->tryAcquireOne()); time_system.sleep(1s); // Verify that after the rampup the expected constant pacing is maintained. @@ -257,9 +269,11 @@ class LinearlyOpeningRateLimiterFilterTest : public Test { } }; -// TODO(oschaaf): -TEST_F(LinearlyOpeningRateLimiterFilterTest, DISABLED_TimingVerificationTest) { - EXPECT_EQ(getAcquisitionTimings(10_Hz, 1s), std::vector({1, 501, 601, 701, 801})); +TEST_F(GraduallyOpeningRateLimiterFilterTest, TimingVerificationTest) { + EXPECT_EQ(getAcquisitionTimings(10_Hz, 10s), std::vector({})); + EXPECT_EQ(getAcquisitionTimings(50_Hz, 1s), + std::vector({120, 320, 380, 560, 580, 600, 620, 640, 660, 680, 700, 740, + 760, 780, 840, 860, 880, 900, 920, 940, 960, 980, 1000})); } } // namespace Nighthawk From 1d62a00227ebb6e1d3a33e25ac6381d73075f35c Mon Sep 17 00:00:00 2001 From: Otto van der Schaaf Date: Mon, 2 Dec 2019 16:58:22 +0100 Subject: [PATCH 05/12] save state Signed-off-by: Otto van der Schaaf --- include/nighthawk/common/rate_limiter.h | 6 +-- source/common/rate_limiter_impl.cc | 66 +++++++++++------------ source/common/rate_limiter_impl.h | 70 +++++++++++++++---------- test/mocks.h | 6 +-- test/rate_limiter_test.cc | 45 +++++++++------- 5 files changed, 105 insertions(+), 88 deletions(-) diff --git a/include/nighthawk/common/rate_limiter.h b/include/nighthawk/common/rate_limiter.h index 79c1d9191..93c8635eb 100644 --- a/include/nighthawk/common/rate_limiter.h +++ b/include/nighthawk/common/rate_limiter.h @@ -32,7 +32,7 @@ class RateLimiter { */ virtual Envoy::TimeSource& timeSource() PURE; virtual absl::optional timeStarted() const PURE; - virtual std::chrono::nanoseconds elapsed() const PURE; + virtual std::chrono::nanoseconds elapsed() PURE; }; using RateLimiterPtr = std::unique_ptr; @@ -43,8 +43,8 @@ using RateLimiterPtr = std::unique_ptr; class DiscreteNumericDistributionSampler { public: virtual ~DiscreteNumericDistributionSampler() = default; - virtual uint64_t min() PURE; - virtual uint64_t max() PURE; + virtual uint64_t min() const PURE; + virtual uint64_t max() const PURE; virtual uint64_t getValue() PURE; }; diff --git a/source/common/rate_limiter_impl.cc b/source/common/rate_limiter_impl.cc index 4abfb729c..7815fdea1 100644 --- a/source/common/rate_limiter_impl.cc +++ b/source/common/rate_limiter_impl.cc @@ -53,7 +53,8 @@ void BurstingRateLimiter::releaseOne() { } LinearRateLimiter::LinearRateLimiter(Envoy::TimeSource& time_source, const Frequency frequency) - : time_source_(time_source), acquireable_count_(0), acquired_count_(0), frequency_(frequency) { + : RateLimiterBaseImpl(time_source), acquireable_count_(0), acquired_count_(0), + frequency_(frequency) { if (frequency.value() <= 0) { throw NighthawkException("Frequency must be > 0"); } @@ -61,9 +62,6 @@ LinearRateLimiter::LinearRateLimiter(Envoy::TimeSource& time_source, const Frequ bool LinearRateLimiter::tryAcquireOne() { // TODO(oschaaf): consider adding an explicit start() call to the interface. - if (start_time_ == absl::nullopt) { - start_time_ = time_source_.monotonicTime(); - } if (acquireable_count_ > 0) { acquireable_count_--; acquired_count_++; @@ -71,10 +69,8 @@ bool LinearRateLimiter::tryAcquireOne() { } acquireable_count_ = - frequency_.value() == 0 - ? 0 - : static_cast(std::floor(elapsed() / frequency_.interval())) - acquired_count_; - return acquireable_count_ > 0 ? LinearRateLimiter::tryAcquireOne() : false; + static_cast(std::floor(elapsed() / frequency_.interval())) - acquired_count_; + return acquireable_count_ > 0 ? tryAcquireOne() : false; } void LinearRateLimiter::releaseOne() { @@ -82,6 +78,33 @@ void LinearRateLimiter::releaseOne() { acquired_count_--; } +LinearRampingRateLimiter::LinearRampingRateLimiter(Envoy::TimeSource& time_source, + const Frequency frequency) + : RateLimiterBaseImpl(time_source), frequency_(frequency) { + if (frequency_.value() <= 0) { + throw NighthawkException("frequency must be > 0"); + } +} + +bool LinearRampingRateLimiter::tryAcquireOne() { + if (acquireable_count_) { + acquired_count_++; + return acquireable_count_--; + } + const auto elapsed_time = elapsed(); + const std::chrono::duration chrono_seconds(elapsed_time); + const double seconds = chrono_seconds.count(); + const double frequency = seconds * frequency_.value(); + const uint64_t total = std::round(seconds * frequency / 2.0); + acquireable_count_ = total - acquired_count_; + return acquireable_count_ > 0 ? tryAcquireOne() : false; +} + +void LinearRampingRateLimiter::releaseOne() { + acquireable_count_++; + acquired_count_--; +} + DelegatingRateLimiter::DelegatingRateLimiter(RateLimiterPtr&& rate_limiter, RateLimiterDelegate random_distribution_generator) : ForwardingRateLimiterImpl(std::move(rate_limiter)), @@ -115,33 +138,6 @@ bool FilteringRateLimiter::tryAcquireOne() { return rate_limiter_->tryAcquireOne() ? filter_() : false; } -LinearRampingRateLimiter::LinearRampingRateLimiter(Envoy::TimeSource& time_source, - const std::chrono::nanoseconds ramp_time, - const Frequency frequency) - : LinearRateLimiter(time_source, frequency), final_frequency_(frequency), - ramp_time_(ramp_time) { - if (ramp_time.count() <= 0) { - throw NighthawkException("ramp_time must be > 0"); - } -} - -bool LinearRampingRateLimiter::tryAcquireOne() { - bool return_value = false; - if (timeStarted() == absl::nullopt || (frequency_.value() != final_frequency_.value())) { - const double fraction = - 1.0 - ((ramp_time_.count() - elapsed().count()) / (ramp_time_.count() * 1.0)); - frequency_ = Frequency(std::round(final_frequency_.value() * fraction)); - // LinearRateLimiter tracks how many ought to have been acquired and will compensate when we - // change the frequency. We're greedy here to disable that corrective behaviour when ramping. - while (LinearRateLimiter::tryAcquireOne()) { - return_value = true; - } - } else { - return_value = LinearRateLimiter::tryAcquireOne(); - } - return return_value; -} - GraduallyOpeningRateLimiterFilter::GraduallyOpeningRateLimiterFilter( const std::chrono::nanoseconds ramp_time, DiscreteNumericDistributionSamplerPtr&& provider, RateLimiterPtr&& rate_limiter) diff --git a/source/common/rate_limiter_impl.h b/source/common/rate_limiter_impl.h index e75a8eade..0b37f384d 100644 --- a/source/common/rate_limiter_impl.h +++ b/source/common/rate_limiter_impl.h @@ -22,7 +22,7 @@ class ForwardingRateLimiterImpl : public RateLimiter { absl::optional timeStarted() const override { return rate_limiter_->timeStarted(); } - std::chrono::nanoseconds elapsed() const override { return rate_limiter_->elapsed(); } + std::chrono::nanoseconds elapsed() override { return rate_limiter_->elapsed(); } protected: const RateLimiterPtr rate_limiter_; @@ -50,30 +50,60 @@ class BurstingRateLimiter : public ForwardingRateLimiterImpl, absl::optional previously_releasing_; // Solely used for sanity checking. }; +/** + * A rate limiter which linearly ramps up to the desired frequency over the specified period. + */ +class RateLimiterBaseImpl : public RateLimiter { +public: + RateLimiterBaseImpl(Envoy::TimeSource& time_source) : time_source_(time_source){}; + Envoy::TimeSource& timeSource() override { return time_source_; } + absl::optional timeStarted() const override { return start_time_; } + std::chrono::nanoseconds elapsed() override { + // TODO(oschaaf): consider adding an explicit start() call to the interface. + const auto now = time_source_.monotonicTime(); + if (start_time_ == absl::nullopt) { + start_time_ = now; + } + return now - start_time_.value(); + } + +private: + Envoy::TimeSource& time_source_; + absl::optional start_time_; +}; + /** * Simple rate limiter that will allow acquiring at a linear pace. * The average rate is computed over a timeframe that starts at * the first call to tryAcquireOne(). */ -class LinearRateLimiter : public RateLimiter, +class LinearRateLimiter : public RateLimiterBaseImpl, public Envoy::Logger::Loggable { public: LinearRateLimiter(Envoy::TimeSource& time_source, const Frequency frequency); bool tryAcquireOne() override; void releaseOne() override; - Envoy::TimeSource& timeSource() override { return time_source_; } - absl::optional timeStarted() const override { return start_time_; } - std::chrono::nanoseconds elapsed() const override { - const auto now = time_source_.monotonicTime(); - return now - start_time_.value_or(now); - } protected: - Envoy::TimeSource& time_source_; int64_t acquireable_count_{0}; uint64_t acquired_count_{0}; - Frequency frequency_; - absl::optional start_time_; + const Frequency frequency_; +}; + +/** + * A rate limiter which linearly ramps up to the desired frequency over the specified period. + */ +class LinearRampingRateLimiter : public RateLimiterBaseImpl, + public Envoy::Logger::Loggable { +public: + LinearRampingRateLimiter(Envoy::TimeSource& time_source, const Frequency frequency); + bool tryAcquireOne() override; + void releaseOne() override; + +private: + int64_t acquireable_count_{0}; + uint64_t acquired_count_{0}; + const Frequency frequency_; }; // We use an unsigned duration here to ensure only future points in time will be yielded. @@ -102,27 +132,13 @@ class DelegatingRateLimiter : public ForwardingRateLimiterImpl, absl::optional distributed_start_; }; -/** - * A rate limiter which linearly ramps up to the desired frequency over the specified period. - */ -class LinearRampingRateLimiter : public LinearRateLimiter { -public: - LinearRampingRateLimiter(Envoy::TimeSource& time_source, const std::chrono::nanoseconds ramp_time, - const Frequency frequency); - bool tryAcquireOne() override; - -private: - const Frequency final_frequency_; - const std::chrono::nanoseconds ramp_time_; -}; - class UniformRandomDistributionSamplerImpl : public DiscreteNumericDistributionSampler { public: UniformRandomDistributionSamplerImpl(const uint64_t upper_bound) : distribution_(0, upper_bound) {} uint64_t getValue() override { return distribution_(generator_); } - uint64_t min() override { return distribution_.min(); } - uint64_t max() override { return distribution_.max(); } + uint64_t min() const override { return distribution_.min(); } + uint64_t max() const override { return distribution_.max(); } private: std::default_random_engine generator_; diff --git a/test/mocks.h b/test/mocks.h index 114c34951..ff464335e 100644 --- a/test/mocks.h +++ b/test/mocks.h @@ -50,7 +50,7 @@ class MockRateLimiter : public RateLimiter { MOCK_METHOD0(releaseOne, void()); MOCK_METHOD0(timeSource, Envoy::TimeSource&()); MOCK_CONST_METHOD0(timeStarted, absl::optional()); - MOCK_CONST_METHOD0(elapsed, std::chrono::nanoseconds()); + MOCK_METHOD0(elapsed, std::chrono::nanoseconds()); }; class MockSequencer : public Sequencer { @@ -189,8 +189,8 @@ class MockDiscreteNumericDistributionSampler : public DiscreteNumericDistributio public: MockDiscreteNumericDistributionSampler(); MOCK_METHOD0(getValue, uint64_t()); - MOCK_METHOD0(min, uint64_t()); - MOCK_METHOD0(max, uint64_t()); + MOCK_CONST_METHOD0(min, uint64_t()); + MOCK_CONST_METHOD0(max, uint64_t()); }; } // namespace Nighthawk diff --git a/test/rate_limiter_test.cc b/test/rate_limiter_test.cc index 273b96803..e00236b2f 100644 --- a/test/rate_limiter_test.cc +++ b/test/rate_limiter_test.cc @@ -172,49 +172,54 @@ TEST_F(RateLimiterTest, DistributionSamplingRateLimiterImplSchedulingTest) { EXPECT_TRUE(rate_limiter->tryAcquireOne()); } +// TODO(oschaaf): once we have hr sleep, test at a higher res. class LinearRampingRateLimiterTest : public Test { public: std::vector getAcquisitionTimings(const Frequency frequency, const std::chrono::seconds duration) { Envoy::Event::SimulatedTimeSystem time_system; std::vector aquisition_timings; - LinearRampingRateLimiter rate_limiter(time_system, duration, frequency); + LinearRampingRateLimiter rate_limiter(time_system, frequency); auto total_ms_elapsed = 0ms; - auto clock_tick = 1ms; + const auto clock_tick = 1ms; + auto last_acquisition_timestamp = 0ms; + EXPECT_FALSE(rate_limiter.tryAcquireOne()); do { - while (rate_limiter.tryAcquireOne()) { + if (rate_limiter.tryAcquireOne()) { + EXPECT_FALSE(rate_limiter.tryAcquireOne()); aquisition_timings.push_back(total_ms_elapsed.count()); + last_acquisition_timestamp = total_ms_elapsed; } + const auto expected_actual_total = + std::round((std::pow(total_ms_elapsed.count() / 1000.0, 2) * frequency.value()) / 2); + EXPECT_EQ(aquisition_timings.size(), expected_actual_total); time_system.sleep(clock_tick); total_ms_elapsed += clock_tick; } while (total_ms_elapsed <= duration); - EXPECT_FALSE(rate_limiter.tryAcquireOne()); - time_system.sleep(1s); - // Verify that after the rampup the expected constant pacing is maintained. - // Calls should be forwarded to the regular linear rate limiter algorithm with its - // corrective behavior so we can expect to acquire a series with that. - for (uint64_t i = 0; i < frequency.value(); i++) { - EXPECT_TRUE(rate_limiter.tryAcquireOne()); - } - // Verify we acquired everything. - EXPECT_FALSE(rate_limiter.tryAcquireOne()); + + const auto expected_total = std::round((std::pow(duration.count(), 2) * frequency.value()) / 2); + EXPECT_EQ(aquisition_timings.size(), expected_total); return aquisition_timings; } }; TEST_F(RateLimiterTest, LinearRampingRateLimiterInvalidArgumentTest) { Envoy::Event::SimulatedTimeSystem time_system; - EXPECT_THROW(LinearRampingRateLimiter rate_limiter(time_system, 1s, 0_Hz);, NighthawkException); - EXPECT_THROW(LinearRampingRateLimiter rate_limiter(time_system, 0s, 1_Hz);, NighthawkException); - EXPECT_THROW(LinearRampingRateLimiter rate_limiter(time_system, -1s, 1_Hz);, NighthawkException); + EXPECT_THROW(LinearRampingRateLimiter rate_limiter(time_system, 0_Hz);, NighthawkException); } TEST_F(LinearRampingRateLimiterTest, TimingVerificationTest) { - EXPECT_EQ(getAcquisitionTimings(5_Hz, 5s), - std::vector({1000, 1500, 2000, 2500, 2667, 3000, 3334, 3500, 3750, 4000, 4250, - 4500, 4600, 4800, 5000})); + EXPECT_EQ(getAcquisitionTimings(1_Hz, 5s), + std::vector( + {1000, 1733, 2237, 2646, 3000, 3317, 3606, 3873, 4124, 4359, 4583, 4796, 5000})); + EXPECT_EQ(getAcquisitionTimings(7_Hz, 2s), + std::vector( + {378, 655, 846, 1000, 1134, 1254, 1363, 1464, 1559, 1648, 1733, 1813, 1890, 1964})); + getAcquisitionTimings(7_Hz, 68s); + getAcquisitionTimings(10_Hz, 5s); + getAcquisitionTimings(9_Hz, 3s); } class GraduallyOpeningRateLimiterFilterTest : public Test { @@ -270,7 +275,7 @@ class GraduallyOpeningRateLimiterFilterTest : public Test { }; TEST_F(GraduallyOpeningRateLimiterFilterTest, TimingVerificationTest) { - EXPECT_EQ(getAcquisitionTimings(10_Hz, 10s), std::vector({})); + // EXPECT_EQ(getAcquisitionTimings(10_Hz, 10s), std::vector({})); EXPECT_EQ(getAcquisitionTimings(50_Hz, 1s), std::vector({120, 320, 380, 560, 580, 600, 620, 640, 660, 680, 700, 740, 760, 780, 840, 860, 880, 900, 920, 940, 960, 980, 1000})); From c86cb9278a9493122b14295f2d063a38b74a2e62 Mon Sep 17 00:00:00 2001 From: Otto van der Schaaf Date: Mon, 2 Dec 2019 17:05:20 +0100 Subject: [PATCH 06/12] Back out change to constness in Frequency Signed-off-by: Otto van der Schaaf --- source/common/frequency.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/common/frequency.h b/source/common/frequency.h index f3bb1afcc..0c1fc652a 100644 --- a/source/common/frequency.h +++ b/source/common/frequency.h @@ -13,8 +13,8 @@ class Frequency { const std::chrono::duration interval() const { return interval_; } private: - uint64_t hertz_; - std::chrono::duration interval_; + const uint64_t hertz_; + const std::chrono::duration interval_; }; constexpr Frequency operator"" _Hz(unsigned long long hz) { return Frequency{hz}; } From 7fc299215645dfed697499e5c4124da191b307f4 Mon Sep 17 00:00:00 2001 From: Otto van der Schaaf Date: Mon, 2 Dec 2019 23:03:16 +0100 Subject: [PATCH 07/12] Save state, wire in zipf / foo ZipfRateLimiter Signed-off-by: Otto van der Schaaf --- include/nighthawk/common/rate_limiter.h | 2 ++ source/common/BUILD | 1 + source/common/rate_limiter_impl.cc | 20 ++++++++++++-------- source/common/rate_limiter_impl.h | 14 ++++++++++++++ test/rate_limiter_test.cc | 6 ++++++ 5 files changed, 35 insertions(+), 8 deletions(-) diff --git a/include/nighthawk/common/rate_limiter.h b/include/nighthawk/common/rate_limiter.h index 93c8635eb..3af2e2581 100644 --- a/include/nighthawk/common/rate_limiter.h +++ b/include/nighthawk/common/rate_limiter.h @@ -1,8 +1,10 @@ #pragma once +#include #include #include "envoy/common/pure.h" +#include "envoy/common/time.h" #include "absl/types/optional.h" diff --git a/source/common/BUILD b/source/common/BUILD index 2ad555f7a..fbd273a70 100644 --- a/source/common/BUILD +++ b/source/common/BUILD @@ -39,6 +39,7 @@ envoy_cc_library( "//api/client:grpc_service_lib", "//include/nighthawk/client:client_includes", "//include/nighthawk/common:base_includes", + "@com_google_absl//absl/random", "@dep_hdrhistogram_c//:hdrhistogram_c", "@envoy//source/common/common:assert_lib_with_external_headers", "@envoy//source/common/common:lock_guard_lib_with_external_headers", diff --git a/source/common/rate_limiter_impl.cc b/source/common/rate_limiter_impl.cc index 7815fdea1..a6ca95cd3 100644 --- a/source/common/rate_limiter_impl.cc +++ b/source/common/rate_limiter_impl.cc @@ -131,6 +131,13 @@ void DelegatingRateLimiter::releaseOne() { rate_limiter_->releaseOne(); } +DistributionSamplingRateLimiterImpl::DistributionSamplingRateLimiterImpl( + DiscreteNumericDistributionSamplerPtr&& provider, RateLimiterPtr&& rate_limiter) + : DelegatingRateLimiter( + std::move(rate_limiter), + [this]() { return std::chrono::duration(provider_->getValue()); }), + provider_(std::move(provider)) {} + FilteringRateLimiter::FilteringRateLimiter(RateLimiterPtr&& rate_limiter, RateLimiterFilter filter) : ForwardingRateLimiterImpl(std::move(rate_limiter)), filter_(std::move(filter)) {} @@ -146,8 +153,9 @@ GraduallyOpeningRateLimiterFilter::GraduallyOpeningRateLimiterFilter( [this]() { if (elapsed() < ramp_time_) { const double chance_percentage = - 100.0 - - ((ramp_time_.count() - elapsed().count()) / (ramp_time_.count() * 1.0)) * 100.0; + 100.0 - (static_cast(ramp_time_.count() - elapsed().count()) / + (ramp_time_.count() * 1.0)) * + 100.0; return std::round(provider_->getValue() / 10000.0) <= chance_percentage; } return true; @@ -157,11 +165,7 @@ GraduallyOpeningRateLimiterFilter::GraduallyOpeningRateLimiterFilter( "expected a distribution ranging from 1-1000000"); } -DistributionSamplingRateLimiterImpl::DistributionSamplingRateLimiterImpl( - DiscreteNumericDistributionSamplerPtr&& provider, RateLimiterPtr&& rate_limiter) - : DelegatingRateLimiter( - std::move(rate_limiter), - [this]() { return std::chrono::duration(provider_->getValue()); }), - provider_(std::move(provider)) {} +ZipfRateLimiter::ZipfRateLimiter(RateLimiterPtr&& rate_limiter) + : FilteringRateLimiter(std::move(rate_limiter), [this]() { return dist_(g_); }), dist_(1) {} } // namespace Nighthawk \ No newline at end of file diff --git a/source/common/rate_limiter_impl.h b/source/common/rate_limiter_impl.h index 0b37f384d..e446d5e30 100644 --- a/source/common/rate_limiter_impl.h +++ b/source/common/rate_limiter_impl.h @@ -10,6 +10,8 @@ #include "common/frequency.h" +#include "absl/random/random.h" +#include "absl/random/zipf_distribution.h" #include "absl/types/optional.h" namespace Nighthawk { @@ -168,6 +170,9 @@ class FilteringRateLimiter : public ForwardingRateLimiterImpl, const RateLimiterFilter filter_; }; +/** + * Takes a probabilistic approach to suppress + */ class GraduallyOpeningRateLimiterFilter : public FilteringRateLimiter { public: GraduallyOpeningRateLimiterFilter(const std::chrono::nanoseconds ramp_time, @@ -179,4 +184,13 @@ class GraduallyOpeningRateLimiterFilter : public FilteringRateLimiter { const std::chrono::nanoseconds ramp_time_; }; +class ZipfRateLimiter : public FilteringRateLimiter { +public: + ZipfRateLimiter(RateLimiterPtr&& rate_limiter); + +private: + absl::zipf_distribution dist_; + absl::InsecureBitGen g_; +}; + } // namespace Nighthawk \ No newline at end of file diff --git a/test/rate_limiter_test.cc b/test/rate_limiter_test.cc index e00236b2f..f2a03f34d 100644 --- a/test/rate_limiter_test.cc +++ b/test/rate_limiter_test.cc @@ -281,4 +281,10 @@ TEST_F(GraduallyOpeningRateLimiterFilterTest, TimingVerificationTest) { 760, 780, 840, 860, 880, 900, 920, 940, 960, 980, 1000})); } +class ZipfRateLimiterTest : public Test {}; + +TEST_F(ZipfRateLimiterTest, TimingVerificationTest) { + // TODO(oschaaf): fix zipf distribution based rate limiter, add the real thing. +} + } // namespace Nighthawk From 9def3cdc794f5feb4e023d5801ca6b1b7b8062df Mon Sep 17 00:00:00 2001 From: Otto van der Schaaf Date: Tue, 3 Dec 2019 00:19:07 +0100 Subject: [PATCH 08/12] Some comments & tidying up Signed-off-by: Otto van der Schaaf --- source/common/rate_limiter_impl.cc | 29 +++---- source/common/rate_limiter_impl.h | 123 ++++++++++++++++------------- test/rate_limiter_test.cc | 17 ++-- 3 files changed, 89 insertions(+), 80 deletions(-) diff --git a/source/common/rate_limiter_impl.cc b/source/common/rate_limiter_impl.cc index a6ca95cd3..228d4d521 100644 --- a/source/common/rate_limiter_impl.cc +++ b/source/common/rate_limiter_impl.cc @@ -78,15 +78,15 @@ void LinearRateLimiter::releaseOne() { acquired_count_--; } -LinearRampingRateLimiter::LinearRampingRateLimiter(Envoy::TimeSource& time_source, - const Frequency frequency) +LinearRampingRateLimiterImpl::LinearRampingRateLimiterImpl(Envoy::TimeSource& time_source, + const Frequency frequency) : RateLimiterBaseImpl(time_source), frequency_(frequency) { if (frequency_.value() <= 0) { throw NighthawkException("frequency must be > 0"); } } -bool LinearRampingRateLimiter::tryAcquireOne() { +bool LinearRampingRateLimiterImpl::tryAcquireOne() { if (acquireable_count_) { acquired_count_++; return acquireable_count_--; @@ -100,17 +100,17 @@ bool LinearRampingRateLimiter::tryAcquireOne() { return acquireable_count_ > 0 ? tryAcquireOne() : false; } -void LinearRampingRateLimiter::releaseOne() { +void LinearRampingRateLimiterImpl::releaseOne() { acquireable_count_++; acquired_count_--; } -DelegatingRateLimiter::DelegatingRateLimiter(RateLimiterPtr&& rate_limiter, - RateLimiterDelegate random_distribution_generator) +DelegatingRateLimiterImpl::DelegatingRateLimiterImpl( + RateLimiterPtr&& rate_limiter, RateLimiterDelegate random_distribution_generator) : ForwardingRateLimiterImpl(std::move(rate_limiter)), random_distribution_generator_(std::move(random_distribution_generator)) {} -bool DelegatingRateLimiter::tryAcquireOne() { +bool DelegatingRateLimiterImpl::tryAcquireOne() { const auto now = timeSource().monotonicTime(); if (distributed_start_ == absl::nullopt) { if (rate_limiter_->tryAcquireOne()) { @@ -126,29 +126,30 @@ bool DelegatingRateLimiter::tryAcquireOne() { return false; } -void DelegatingRateLimiter::releaseOne() { +void DelegatingRateLimiterImpl::releaseOne() { distributed_start_ = absl::nullopt; rate_limiter_->releaseOne(); } DistributionSamplingRateLimiterImpl::DistributionSamplingRateLimiterImpl( DiscreteNumericDistributionSamplerPtr&& provider, RateLimiterPtr&& rate_limiter) - : DelegatingRateLimiter( + : DelegatingRateLimiterImpl( std::move(rate_limiter), [this]() { return std::chrono::duration(provider_->getValue()); }), provider_(std::move(provider)) {} -FilteringRateLimiter::FilteringRateLimiter(RateLimiterPtr&& rate_limiter, RateLimiterFilter filter) +FilteringRateLimiterImpl::FilteringRateLimiterImpl(RateLimiterPtr&& rate_limiter, + RateLimiterFilter filter) : ForwardingRateLimiterImpl(std::move(rate_limiter)), filter_(std::move(filter)) {} -bool FilteringRateLimiter::tryAcquireOne() { +bool FilteringRateLimiterImpl::tryAcquireOne() { return rate_limiter_->tryAcquireOne() ? filter_() : false; } GraduallyOpeningRateLimiterFilter::GraduallyOpeningRateLimiterFilter( const std::chrono::nanoseconds ramp_time, DiscreteNumericDistributionSamplerPtr&& provider, RateLimiterPtr&& rate_limiter) - : FilteringRateLimiter( + : FilteringRateLimiterImpl( std::move(rate_limiter), [this]() { if (elapsed() < ramp_time_) { @@ -165,7 +166,7 @@ GraduallyOpeningRateLimiterFilter::GraduallyOpeningRateLimiterFilter( "expected a distribution ranging from 1-1000000"); } -ZipfRateLimiter::ZipfRateLimiter(RateLimiterPtr&& rate_limiter) - : FilteringRateLimiter(std::move(rate_limiter), [this]() { return dist_(g_); }), dist_(1) {} +ZipfRateLimiterImpl::ZipfRateLimiterImpl(RateLimiterPtr&& rate_limiter) + : FilteringRateLimiterImpl(std::move(rate_limiter), [this]() { return dist_(g_); }), dist_(1) {} } // namespace Nighthawk \ No newline at end of file diff --git a/source/common/rate_limiter_impl.h b/source/common/rate_limiter_impl.h index e446d5e30..ea0712bd5 100644 --- a/source/common/rate_limiter_impl.h +++ b/source/common/rate_limiter_impl.h @@ -16,44 +16,8 @@ namespace Nighthawk { -class ForwardingRateLimiterImpl : public RateLimiter { -public: - ForwardingRateLimiterImpl(RateLimiterPtr&& rate_limiter) - : rate_limiter_(std::move(rate_limiter)) {} - Envoy::TimeSource& timeSource() override { return rate_limiter_->timeSource(); } - absl::optional timeStarted() const override { - return rate_limiter_->timeStarted(); - } - std::chrono::nanoseconds elapsed() override { return rate_limiter_->elapsed(); } - -protected: - const RateLimiterPtr rate_limiter_; -}; - /** - * BurstingRatelimiter can be wrapped around another rate limiter. It has two modes: - * 1. First it will be accumulating acquisitions by forwarding calls to the wrapped - * rate limiter, until the accumulated acquisitions equals the specified burst size. - * 2. Release mode. In this mode, BatchingRateLimiter is in control and will be handling - * acquisition calls (returning true and substracting from the accumulated total until - * nothing is left, after which mode 1 will be entered again). - */ -class BurstingRateLimiter : public ForwardingRateLimiterImpl, - public Envoy::Logger::Loggable { -public: - BurstingRateLimiter(RateLimiterPtr&& rate_limiter, const uint64_t burst_size); - bool tryAcquireOne() override; - void releaseOne() override; - -private: - const uint64_t burst_size_; - uint64_t accumulated_{0}; - bool releasing_{}; - absl::optional previously_releasing_; // Solely used for sanity checking. -}; - -/** - * A rate limiter which linearly ramps up to the desired frequency over the specified period. + * Rate limiter base class, which implements some shared functionality. */ class RateLimiterBaseImpl : public RateLimiter { public: @@ -95,10 +59,10 @@ class LinearRateLimiter : public RateLimiterBaseImpl, /** * A rate limiter which linearly ramps up to the desired frequency over the specified period. */ -class LinearRampingRateLimiter : public RateLimiterBaseImpl, - public Envoy::Logger::Loggable { +class LinearRampingRateLimiterImpl : public RateLimiterBaseImpl, + public Envoy::Logger::Loggable { public: - LinearRampingRateLimiter(Envoy::TimeSource& time_source, const Frequency frequency); + LinearRampingRateLimiterImpl(Envoy::TimeSource& time_source, const Frequency frequency); bool tryAcquireOne() override; void releaseOne() override; @@ -108,22 +72,62 @@ class LinearRampingRateLimiter : public RateLimiterBaseImpl, const Frequency frequency_; }; -// We use an unsigned duration here to ensure only future points in time will be yielded. -// The consuming rate limiter will hold off opening up until the initial point in time plus the -// offset obtained via the delegate have transpired. -using RateLimiterDelegate = std::function()>; +/** + * Base for a rate limiter which wraps another rate limiter, and forwards + * some calls. + */ +class ForwardingRateLimiterImpl : public RateLimiter { +public: + ForwardingRateLimiterImpl(RateLimiterPtr&& rate_limiter) + : rate_limiter_(std::move(rate_limiter)) {} + Envoy::TimeSource& timeSource() override { return rate_limiter_->timeSource(); } + absl::optional timeStarted() const override { + return rate_limiter_->timeStarted(); + } + std::chrono::nanoseconds elapsed() override { return rate_limiter_->elapsed(); } -using RateLimiterFilter = std::function; +protected: + const RateLimiterPtr rate_limiter_; +}; + +/** + * BurstingRatelimiter can be wrapped around another rate limiter. It has two modes: + * 1. First it will be accumulating acquisitions by forwarding calls to the wrapped + * rate limiter, until the accumulated acquisitions equals the specified burst size. + * 2. Release mode. In this mode, BatchingRateLimiter is in control and will be handling + * acquisition calls (returning true and substracting from the accumulated total until + * nothing is left, after which mode 1 will be entered again). + */ +class BurstingRateLimiter : public ForwardingRateLimiterImpl, + public Envoy::Logger::Loggable { +public: + BurstingRateLimiter(RateLimiterPtr&& rate_limiter, const uint64_t burst_size); + bool tryAcquireOne() override; + void releaseOne() override; + +private: + const uint64_t burst_size_; + uint64_t accumulated_{0}; + bool releasing_{}; + absl::optional previously_releasing_; // Solely used for sanity checking. +}; + +/** + * The consuming rate limiter will hold off opening up until the initial point in time plus the + * offset obtained via the delegate have transpired. + * We use an unsigned duration here to ensure only future points in time will be yielded. + */ +using RateLimiterDelegate = std::function()>; /** * Wraps a rate limiter, and allows plugging in a delegate which will be queried to offset the * timing of the underlying rate limiter. */ -class DelegatingRateLimiter : public ForwardingRateLimiterImpl, - public Envoy::Logger::Loggable { +class DelegatingRateLimiterImpl : public ForwardingRateLimiterImpl, + public Envoy::Logger::Loggable { public: - DelegatingRateLimiter(RateLimiterPtr&& rate_limiter, - RateLimiterDelegate random_distribution_generator); + DelegatingRateLimiterImpl(RateLimiterPtr&& rate_limiter, + RateLimiterDelegate random_distribution_generator); bool tryAcquireOne() override; void releaseOne() override; @@ -148,7 +152,7 @@ class UniformRandomDistributionSamplerImpl : public DiscreteNumericDistributionS }; // Allows adding uniformly distributed random timing offsets to an underlying rate limiter. -class DistributionSamplingRateLimiterImpl : public DelegatingRateLimiter { +class DistributionSamplingRateLimiterImpl : public DelegatingRateLimiterImpl { public: DistributionSamplingRateLimiterImpl(DiscreteNumericDistributionSamplerPtr&& provider, RateLimiterPtr&& rate_limiter); @@ -157,12 +161,17 @@ class DistributionSamplingRateLimiterImpl : public DelegatingRateLimiter { DiscreteNumericDistributionSamplerPtr provider_; }; +/** + * Callback used to indicate if a rate limiter release should be supressed or not. + */ +using RateLimiterFilter = std::function; + // Wraps a rate limiter, and allows plugging in a delegate which will be queried to apply a // filter to acquisitions. -class FilteringRateLimiter : public ForwardingRateLimiterImpl, - public Envoy::Logger::Loggable { +class FilteringRateLimiterImpl : public ForwardingRateLimiterImpl, + public Envoy::Logger::Loggable { public: - FilteringRateLimiter(RateLimiterPtr&& rate_limiter, RateLimiterFilter filter); + FilteringRateLimiterImpl(RateLimiterPtr&& rate_limiter, RateLimiterFilter filter); bool tryAcquireOne() override; void releaseOne() override { rate_limiter_->releaseOne(); } @@ -171,9 +180,9 @@ class FilteringRateLimiter : public ForwardingRateLimiterImpl, }; /** - * Takes a probabilistic approach to suppress + * Takes a probabilistic approach to suppressing an arbitrary wrapper rate limiter. */ -class GraduallyOpeningRateLimiterFilter : public FilteringRateLimiter { +class GraduallyOpeningRateLimiterFilter : public FilteringRateLimiterImpl { public: GraduallyOpeningRateLimiterFilter(const std::chrono::nanoseconds ramp_time, DiscreteNumericDistributionSamplerPtr&& provider, @@ -184,9 +193,9 @@ class GraduallyOpeningRateLimiterFilter : public FilteringRateLimiter { const std::chrono::nanoseconds ramp_time_; }; -class ZipfRateLimiter : public FilteringRateLimiter { +class ZipfRateLimiterImpl : public FilteringRateLimiterImpl { public: - ZipfRateLimiter(RateLimiterPtr&& rate_limiter); + ZipfRateLimiterImpl(RateLimiterPtr&& rate_limiter); private: absl::zipf_distribution dist_; diff --git a/test/rate_limiter_test.cc b/test/rate_limiter_test.cc index f2a03f34d..ed0f00715 100644 --- a/test/rate_limiter_test.cc +++ b/test/rate_limiter_test.cc @@ -125,7 +125,7 @@ TEST_F(RateLimiterTest, DistributionSamplingRateLimiterImplTest) { acquisitions++; } // We test the release gets propagated to the mock rate limiter. - // also, the release will force DelegatingRateLimiter to propagate tryAcquireOne. + // also, the release will force DelegatingRateLimiterImpl to propagate tryAcquireOne. rate_limiter->releaseOne(); } // 1 in a billion chance of failure. @@ -173,13 +173,13 @@ TEST_F(RateLimiterTest, DistributionSamplingRateLimiterImplSchedulingTest) { } // TODO(oschaaf): once we have hr sleep, test at a higher res. -class LinearRampingRateLimiterTest : public Test { +class LinearRampingRateLimiterImplTest : public Test { public: std::vector getAcquisitionTimings(const Frequency frequency, const std::chrono::seconds duration) { Envoy::Event::SimulatedTimeSystem time_system; std::vector aquisition_timings; - LinearRampingRateLimiter rate_limiter(time_system, frequency); + LinearRampingRateLimiterImpl rate_limiter(time_system, frequency); auto total_ms_elapsed = 0ms; const auto clock_tick = 1ms; auto last_acquisition_timestamp = 0ms; @@ -205,12 +205,12 @@ class LinearRampingRateLimiterTest : public Test { } }; -TEST_F(RateLimiterTest, LinearRampingRateLimiterInvalidArgumentTest) { +TEST_F(RateLimiterTest, LinearRampingRateLimiterImplInvalidArgumentTest) { Envoy::Event::SimulatedTimeSystem time_system; - EXPECT_THROW(LinearRampingRateLimiter rate_limiter(time_system, 0_Hz);, NighthawkException); + EXPECT_THROW(LinearRampingRateLimiterImpl rate_limiter(time_system, 0_Hz);, NighthawkException); } -TEST_F(LinearRampingRateLimiterTest, TimingVerificationTest) { +TEST_F(LinearRampingRateLimiterImplTest, TimingVerificationTest) { EXPECT_EQ(getAcquisitionTimings(1_Hz, 5s), std::vector( {1000, 1733, 2237, 2646, 3000, 3317, 3606, 3873, 4124, 4359, 4583, 4796, 5000})); @@ -275,15 +275,14 @@ class GraduallyOpeningRateLimiterFilterTest : public Test { }; TEST_F(GraduallyOpeningRateLimiterFilterTest, TimingVerificationTest) { - // EXPECT_EQ(getAcquisitionTimings(10_Hz, 10s), std::vector({})); EXPECT_EQ(getAcquisitionTimings(50_Hz, 1s), std::vector({120, 320, 380, 560, 580, 600, 620, 640, 660, 680, 700, 740, 760, 780, 840, 860, 880, 900, 920, 940, 960, 980, 1000})); } -class ZipfRateLimiterTest : public Test {}; +class ZipfRateLimiterImplTest : public Test {}; -TEST_F(ZipfRateLimiterTest, TimingVerificationTest) { +TEST_F(ZipfRateLimiterImplTest, TimingVerificationTest) { // TODO(oschaaf): fix zipf distribution based rate limiter, add the real thing. } From 4c54837acbed99db29bef4b51860def0fc8a96a8 Mon Sep 17 00:00:00 2001 From: Otto van der Schaaf Date: Tue, 3 Dec 2019 11:53:44 +0100 Subject: [PATCH 09/12] small cleanup Signed-off-by: Otto van der Schaaf --- include/nighthawk/common/rate_limiter.h | 3 --- source/common/rate_limiter_impl.cc | 5 +---- source/common/rate_limiter_impl.h | 6 ------ test/mocks.h | 1 - test/rate_limiter_test.cc | 7 ------- 5 files changed, 1 insertion(+), 21 deletions(-) diff --git a/include/nighthawk/common/rate_limiter.h b/include/nighthawk/common/rate_limiter.h index 3af2e2581..4aaeaefdb 100644 --- a/include/nighthawk/common/rate_limiter.h +++ b/include/nighthawk/common/rate_limiter.h @@ -33,7 +33,6 @@ class RateLimiter { * @return Envoy::TimeSource& time_source */ virtual Envoy::TimeSource& timeSource() PURE; - virtual absl::optional timeStarted() const PURE; virtual std::chrono::nanoseconds elapsed() PURE; }; @@ -45,8 +44,6 @@ using RateLimiterPtr = std::unique_ptr; class DiscreteNumericDistributionSampler { public: virtual ~DiscreteNumericDistributionSampler() = default; - virtual uint64_t min() const PURE; - virtual uint64_t max() const PURE; virtual uint64_t getValue() PURE; }; diff --git a/source/common/rate_limiter_impl.cc b/source/common/rate_limiter_impl.cc index 228d4d521..78c48bc1d 100644 --- a/source/common/rate_limiter_impl.cc +++ b/source/common/rate_limiter_impl.cc @@ -161,10 +161,7 @@ GraduallyOpeningRateLimiterFilter::GraduallyOpeningRateLimiterFilter( } return true; }), - provider_(std::move(provider)), ramp_time_(ramp_time) { - RELEASE_ASSERT(provider_->min() == 1 && provider_->max() == 1000000, - "expected a distribution ranging from 1-1000000"); -} + provider_(std::move(provider)), ramp_time_(ramp_time) {} ZipfRateLimiterImpl::ZipfRateLimiterImpl(RateLimiterPtr&& rate_limiter) : FilteringRateLimiterImpl(std::move(rate_limiter), [this]() { return dist_(g_); }), dist_(1) {} diff --git a/source/common/rate_limiter_impl.h b/source/common/rate_limiter_impl.h index ea0712bd5..299c4e1df 100644 --- a/source/common/rate_limiter_impl.h +++ b/source/common/rate_limiter_impl.h @@ -23,7 +23,6 @@ class RateLimiterBaseImpl : public RateLimiter { public: RateLimiterBaseImpl(Envoy::TimeSource& time_source) : time_source_(time_source){}; Envoy::TimeSource& timeSource() override { return time_source_; } - absl::optional timeStarted() const override { return start_time_; } std::chrono::nanoseconds elapsed() override { // TODO(oschaaf): consider adding an explicit start() call to the interface. const auto now = time_source_.monotonicTime(); @@ -81,9 +80,6 @@ class ForwardingRateLimiterImpl : public RateLimiter { ForwardingRateLimiterImpl(RateLimiterPtr&& rate_limiter) : rate_limiter_(std::move(rate_limiter)) {} Envoy::TimeSource& timeSource() override { return rate_limiter_->timeSource(); } - absl::optional timeStarted() const override { - return rate_limiter_->timeStarted(); - } std::chrono::nanoseconds elapsed() override { return rate_limiter_->elapsed(); } protected: @@ -143,8 +139,6 @@ class UniformRandomDistributionSamplerImpl : public DiscreteNumericDistributionS UniformRandomDistributionSamplerImpl(const uint64_t upper_bound) : distribution_(0, upper_bound) {} uint64_t getValue() override { return distribution_(generator_); } - uint64_t min() const override { return distribution_.min(); } - uint64_t max() const override { return distribution_.max(); } private: std::default_random_engine generator_; diff --git a/test/mocks.h b/test/mocks.h index ff464335e..f87584a91 100644 --- a/test/mocks.h +++ b/test/mocks.h @@ -49,7 +49,6 @@ class MockRateLimiter : public RateLimiter { MOCK_METHOD0(tryAcquireOne, bool()); MOCK_METHOD0(releaseOne, void()); MOCK_METHOD0(timeSource, Envoy::TimeSource&()); - MOCK_CONST_METHOD0(timeStarted, absl::optional()); MOCK_METHOD0(elapsed, std::chrono::nanoseconds()); }; diff --git a/test/rate_limiter_test.cc b/test/rate_limiter_test.cc index ed0f00715..9f9cd9b2b 100644 --- a/test/rate_limiter_test.cc +++ b/test/rate_limiter_test.cc @@ -235,13 +235,6 @@ class GraduallyOpeningRateLimiterFilterTest : public Test { EXPECT_CALL(*unsafe_discrete_numeric_distribution_sampler, getValue) .Times(AtLeast(1)) .WillRepeatedly(Invoke([&dist, &mt]() { return dist(mt); })); - EXPECT_CALL(*unsafe_discrete_numeric_distribution_sampler, min) - .Times(1) - .WillOnce(Return(dist.min())); - EXPECT_CALL(*unsafe_discrete_numeric_distribution_sampler, max) - .Times(1) - .WillOnce(Return(dist.max())); - RateLimiterPtr rate_limiter = std::make_unique( duration, std::unique_ptr( From 88d335ef87d3eabccb38376170ebf5713348e298 Mon Sep 17 00:00:00 2001 From: Otto van der Schaaf Date: Fri, 6 Dec 2019 23:12:18 +0100 Subject: [PATCH 10/12] rate-limiting: test enhancement + refactoring Modulo an improved test for the batching rate limiter, this pulls off some refactoring for upcoming new rate limiters. Split out off from draft PR #218 Signed-off-by: Otto van der Schaaf --- source/client/factories_impl.cc | 2 - source/common/BUILD | 1 - source/common/rate_limiter_impl.cc | 47 ------------- source/common/rate_limiter_impl.h | 41 ----------- test/rate_limiter_test.cc | 107 ----------------------------- 5 files changed, 198 deletions(-) diff --git a/source/client/factories_impl.cc b/source/client/factories_impl.cc index 32123c1ab..a35d8959b 100644 --- a/source/client/factories_impl.cc +++ b/source/client/factories_impl.cc @@ -54,8 +54,6 @@ SequencerPtr SequencerFactoryImpl::create(Envoy::TimeSource& time_source, StatisticFactoryImpl statistic_factory(options_); RateLimiterPtr rate_limiter = std::make_unique(time_source, Frequency(options_.requestsPerSecond())); - // TODO(oschaaf): If we want to take this forward, conditionally construct the ramping rate - // limiter here, e.g.. const uint64_t burst_size = options_.burstSize(); if (burst_size) { diff --git a/source/common/BUILD b/source/common/BUILD index 86ffe2cef..9c5588edb 100644 --- a/source/common/BUILD +++ b/source/common/BUILD @@ -67,7 +67,6 @@ envoy_cc_library( "//api/client:grpc_service_lib", "//include/nighthawk/client:client_includes", "//include/nighthawk/common:base_includes", - "@com_google_absl//absl/random", "@dep_hdrhistogram_c//:hdrhistogram_c", "@envoy//source/common/common:assert_lib_with_external_headers", "@envoy//source/common/common:lock_guard_lib_with_external_headers", diff --git a/source/common/rate_limiter_impl.cc b/source/common/rate_limiter_impl.cc index 78c48bc1d..7fb3d9481 100644 --- a/source/common/rate_limiter_impl.cc +++ b/source/common/rate_limiter_impl.cc @@ -78,33 +78,6 @@ void LinearRateLimiter::releaseOne() { acquired_count_--; } -LinearRampingRateLimiterImpl::LinearRampingRateLimiterImpl(Envoy::TimeSource& time_source, - const Frequency frequency) - : RateLimiterBaseImpl(time_source), frequency_(frequency) { - if (frequency_.value() <= 0) { - throw NighthawkException("frequency must be > 0"); - } -} - -bool LinearRampingRateLimiterImpl::tryAcquireOne() { - if (acquireable_count_) { - acquired_count_++; - return acquireable_count_--; - } - const auto elapsed_time = elapsed(); - const std::chrono::duration chrono_seconds(elapsed_time); - const double seconds = chrono_seconds.count(); - const double frequency = seconds * frequency_.value(); - const uint64_t total = std::round(seconds * frequency / 2.0); - acquireable_count_ = total - acquired_count_; - return acquireable_count_ > 0 ? tryAcquireOne() : false; -} - -void LinearRampingRateLimiterImpl::releaseOne() { - acquireable_count_++; - acquired_count_--; -} - DelegatingRateLimiterImpl::DelegatingRateLimiterImpl( RateLimiterPtr&& rate_limiter, RateLimiterDelegate random_distribution_generator) : ForwardingRateLimiterImpl(std::move(rate_limiter)), @@ -146,24 +119,4 @@ bool FilteringRateLimiterImpl::tryAcquireOne() { return rate_limiter_->tryAcquireOne() ? filter_() : false; } -GraduallyOpeningRateLimiterFilter::GraduallyOpeningRateLimiterFilter( - const std::chrono::nanoseconds ramp_time, DiscreteNumericDistributionSamplerPtr&& provider, - RateLimiterPtr&& rate_limiter) - : FilteringRateLimiterImpl( - std::move(rate_limiter), - [this]() { - if (elapsed() < ramp_time_) { - const double chance_percentage = - 100.0 - (static_cast(ramp_time_.count() - elapsed().count()) / - (ramp_time_.count() * 1.0)) * - 100.0; - return std::round(provider_->getValue() / 10000.0) <= chance_percentage; - } - return true; - }), - provider_(std::move(provider)), ramp_time_(ramp_time) {} - -ZipfRateLimiterImpl::ZipfRateLimiterImpl(RateLimiterPtr&& rate_limiter) - : FilteringRateLimiterImpl(std::move(rate_limiter), [this]() { return dist_(g_); }), dist_(1) {} - } // namespace Nighthawk \ No newline at end of file diff --git a/source/common/rate_limiter_impl.h b/source/common/rate_limiter_impl.h index 299c4e1df..186350e8b 100644 --- a/source/common/rate_limiter_impl.h +++ b/source/common/rate_limiter_impl.h @@ -10,8 +10,6 @@ #include "common/frequency.h" -#include "absl/random/random.h" -#include "absl/random/zipf_distribution.h" #include "absl/types/optional.h" namespace Nighthawk { @@ -55,22 +53,6 @@ class LinearRateLimiter : public RateLimiterBaseImpl, const Frequency frequency_; }; -/** - * A rate limiter which linearly ramps up to the desired frequency over the specified period. - */ -class LinearRampingRateLimiterImpl : public RateLimiterBaseImpl, - public Envoy::Logger::Loggable { -public: - LinearRampingRateLimiterImpl(Envoy::TimeSource& time_source, const Frequency frequency); - bool tryAcquireOne() override; - void releaseOne() override; - -private: - int64_t acquireable_count_{0}; - uint64_t acquired_count_{0}; - const Frequency frequency_; -}; - /** * Base for a rate limiter which wraps another rate limiter, and forwards * some calls. @@ -173,27 +155,4 @@ class FilteringRateLimiterImpl : public ForwardingRateLimiterImpl, const RateLimiterFilter filter_; }; -/** - * Takes a probabilistic approach to suppressing an arbitrary wrapper rate limiter. - */ -class GraduallyOpeningRateLimiterFilter : public FilteringRateLimiterImpl { -public: - GraduallyOpeningRateLimiterFilter(const std::chrono::nanoseconds ramp_time, - DiscreteNumericDistributionSamplerPtr&& provider, - RateLimiterPtr&& rate_limiter); - -private: - DiscreteNumericDistributionSamplerPtr provider_; - const std::chrono::nanoseconds ramp_time_; -}; - -class ZipfRateLimiterImpl : public FilteringRateLimiterImpl { -public: - ZipfRateLimiterImpl(RateLimiterPtr&& rate_limiter); - -private: - absl::zipf_distribution dist_; - absl::InsecureBitGen g_; -}; - } // namespace Nighthawk \ No newline at end of file diff --git a/test/rate_limiter_test.cc b/test/rate_limiter_test.cc index 9f9cd9b2b..018939998 100644 --- a/test/rate_limiter_test.cc +++ b/test/rate_limiter_test.cc @@ -172,111 +172,4 @@ TEST_F(RateLimiterTest, DistributionSamplingRateLimiterImplSchedulingTest) { EXPECT_TRUE(rate_limiter->tryAcquireOne()); } -// TODO(oschaaf): once we have hr sleep, test at a higher res. -class LinearRampingRateLimiterImplTest : public Test { -public: - std::vector getAcquisitionTimings(const Frequency frequency, - const std::chrono::seconds duration) { - Envoy::Event::SimulatedTimeSystem time_system; - std::vector aquisition_timings; - LinearRampingRateLimiterImpl rate_limiter(time_system, frequency); - auto total_ms_elapsed = 0ms; - const auto clock_tick = 1ms; - auto last_acquisition_timestamp = 0ms; - - EXPECT_FALSE(rate_limiter.tryAcquireOne()); - - do { - if (rate_limiter.tryAcquireOne()) { - EXPECT_FALSE(rate_limiter.tryAcquireOne()); - aquisition_timings.push_back(total_ms_elapsed.count()); - last_acquisition_timestamp = total_ms_elapsed; - } - const auto expected_actual_total = - std::round((std::pow(total_ms_elapsed.count() / 1000.0, 2) * frequency.value()) / 2); - EXPECT_EQ(aquisition_timings.size(), expected_actual_total); - time_system.sleep(clock_tick); - total_ms_elapsed += clock_tick; - } while (total_ms_elapsed <= duration); - - const auto expected_total = std::round((std::pow(duration.count(), 2) * frequency.value()) / 2); - EXPECT_EQ(aquisition_timings.size(), expected_total); - return aquisition_timings; - } -}; - -TEST_F(RateLimiterTest, LinearRampingRateLimiterImplInvalidArgumentTest) { - Envoy::Event::SimulatedTimeSystem time_system; - EXPECT_THROW(LinearRampingRateLimiterImpl rate_limiter(time_system, 0_Hz);, NighthawkException); -} - -TEST_F(LinearRampingRateLimiterImplTest, TimingVerificationTest) { - EXPECT_EQ(getAcquisitionTimings(1_Hz, 5s), - std::vector( - {1000, 1733, 2237, 2646, 3000, 3317, 3606, 3873, 4124, 4359, 4583, 4796, 5000})); - EXPECT_EQ(getAcquisitionTimings(7_Hz, 2s), - std::vector( - {378, 655, 846, 1000, 1134, 1254, 1363, 1464, 1559, 1648, 1733, 1813, 1890, 1964})); - getAcquisitionTimings(7_Hz, 68s); - getAcquisitionTimings(10_Hz, 5s); - getAcquisitionTimings(9_Hz, 3s); -} - -class GraduallyOpeningRateLimiterFilterTest : public Test { -public: - std::vector getAcquisitionTimings(const Frequency frequency, - const std::chrono::seconds duration) { - Envoy::Event::SimulatedTimeSystem time_system; - std::vector aquisition_timings; - auto* unsafe_discrete_numeric_distribution_sampler = - new MockDiscreteNumericDistributionSampler(); - std::mt19937_64 mt(1243); - std::uniform_int_distribution dist(1, 1000000); - EXPECT_CALL(*unsafe_discrete_numeric_distribution_sampler, getValue) - .Times(AtLeast(1)) - .WillRepeatedly(Invoke([&dist, &mt]() { return dist(mt); })); - RateLimiterPtr rate_limiter = std::make_unique( - duration, - std::unique_ptr( - unsafe_discrete_numeric_distribution_sampler), - std::make_unique(time_system, frequency)); - auto total_ms_elapsed = 0ms; - auto clock_tick = 1ms; - EXPECT_FALSE(rate_limiter->tryAcquireOne()); - - do { - if (rate_limiter->tryAcquireOne()) { - aquisition_timings.push_back(total_ms_elapsed.count()); - EXPECT_FALSE(rate_limiter->tryAcquireOne()); - } - time_system.sleep(clock_tick); - total_ms_elapsed += clock_tick; - } while (total_ms_elapsed <= duration); - - EXPECT_FALSE(rate_limiter->tryAcquireOne()); - time_system.sleep(1s); - // Verify that after the rampup the expected constant pacing is maintained. - // Calls should be forwarded to the regular linear rate limiter algorithm with its - // corrective behavior so we can expect to acquire a series with that. - for (uint64_t i = 0; i < frequency.value(); i++) { - EXPECT_TRUE(rate_limiter->tryAcquireOne()); - } - // Verify we acquired everything. - EXPECT_FALSE(rate_limiter->tryAcquireOne()); - return aquisition_timings; - } -}; - -TEST_F(GraduallyOpeningRateLimiterFilterTest, TimingVerificationTest) { - EXPECT_EQ(getAcquisitionTimings(50_Hz, 1s), - std::vector({120, 320, 380, 560, 580, 600, 620, 640, 660, 680, 700, 740, - 760, 780, 840, 860, 880, 900, 920, 940, 960, 980, 1000})); -} - -class ZipfRateLimiterImplTest : public Test {}; - -TEST_F(ZipfRateLimiterImplTest, TimingVerificationTest) { - // TODO(oschaaf): fix zipf distribution based rate limiter, add the real thing. -} - } // namespace Nighthawk From 6d1e7471261e67bbf0b083d22135dff210fcfa95 Mon Sep 17 00:00:00 2001 From: Otto van der Schaaf Date: Wed, 11 Dec 2019 10:53:27 +0100 Subject: [PATCH 11/12] Review feedback Signed-off-by: Otto van der Schaaf --- source/common/rate_limiter_impl.cc | 5 ++--- source/common/rate_limiter_impl.h | 4 +++- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/source/common/rate_limiter_impl.cc b/source/common/rate_limiter_impl.cc index 7fb3d9481..89ef39af3 100644 --- a/source/common/rate_limiter_impl.cc +++ b/source/common/rate_limiter_impl.cc @@ -84,14 +84,13 @@ DelegatingRateLimiterImpl::DelegatingRateLimiterImpl( random_distribution_generator_(std::move(random_distribution_generator)) {} bool DelegatingRateLimiterImpl::tryAcquireOne() { - const auto now = timeSource().monotonicTime(); if (distributed_start_ == absl::nullopt) { if (rate_limiter_->tryAcquireOne()) { - distributed_start_ = now + random_distribution_generator_(); + distributed_start_ = timeSource().monotonicTime() + random_distribution_generator_(); } } - if (distributed_start_ != absl::nullopt && distributed_start_ <= now) { + if (distributed_start_ != absl::nullopt && distributed_start_ <= timeSource().monotonicTime()) { distributed_start_ = absl::nullopt; return true; } diff --git a/source/common/rate_limiter_impl.h b/source/common/rate_limiter_impl.h index 186350e8b..609507724 100644 --- a/source/common/rate_limiter_impl.h +++ b/source/common/rate_limiter_impl.h @@ -15,7 +15,9 @@ namespace Nighthawk { /** - * Rate limiter base class, which implements some shared functionality. + * Rate limiter base class, which implements some shared functionality for derivations that + * compute acquireable counts based on elapsed time. Rate limiters that apply filters,offsets + * or otherwise wrap another rate limiter should derive from ForwardingRateLimiterImpl instead. */ class RateLimiterBaseImpl : public RateLimiter { public: From 4d6448b1e80306c0c20ef3dc1c63ba16fec5f2ee Mon Sep 17 00:00:00 2001 From: Otto van der Schaaf Date: Thu, 12 Dec 2019 00:32:50 +0100 Subject: [PATCH 12/12] Review-feedback: add comments Signed-off-by: Otto van der Schaaf --- include/nighthawk/common/rate_limiter.h | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/include/nighthawk/common/rate_limiter.h b/include/nighthawk/common/rate_limiter.h index 4aaeaefdb..99bfef544 100644 --- a/include/nighthawk/common/rate_limiter.h +++ b/include/nighthawk/common/rate_limiter.h @@ -30,9 +30,13 @@ class RateLimiter { virtual void releaseOne() PURE; /** - * @return Envoy::TimeSource& time_source + * @return Envoy::TimeSource& time_source used to track time. */ virtual Envoy::TimeSource& timeSource() PURE; + /** + * @return std::chrono::nanoseconds elapsed since the first call to tryAcquireOne(). Used by some + * rate limiter implementations to compute acquisition rate. + */ virtual std::chrono::nanoseconds elapsed() PURE; };