diff --git a/include/nighthawk/common/rate_limiter.h b/include/nighthawk/common/rate_limiter.h index 99bfef544..c5f5f69e8 100644 --- a/include/nighthawk/common/rate_limiter.h +++ b/include/nighthawk/common/rate_limiter.h @@ -48,7 +48,18 @@ using RateLimiterPtr = std::unique_ptr; class DiscreteNumericDistributionSampler { public: virtual ~DiscreteNumericDistributionSampler() = default; + /** + * @return uint64_t gets a sample value from the distribution. + */ virtual uint64_t getValue() PURE; + /** + * @return uint64_t minimum sample value that can be returned by getValue(). + */ + virtual uint64_t min() const PURE; + /** + * @return uint64_t maximum sample value that can returned by getValue(). + */ + virtual uint64_t max() const PURE; }; using DiscreteNumericDistributionSamplerPtr = std::unique_ptr; diff --git a/source/common/BUILD b/source/common/BUILD index 9c5588edb..86ffe2cef 100644 --- a/source/common/BUILD +++ b/source/common/BUILD @@ -67,6 +67,7 @@ envoy_cc_library( "//api/client:grpc_service_lib", "//include/nighthawk/client:client_includes", "//include/nighthawk/common:base_includes", + "@com_google_absl//absl/random", "@dep_hdrhistogram_c//:hdrhistogram_c", "@envoy//source/common/common:assert_lib_with_external_headers", "@envoy//source/common/common:lock_guard_lib_with_external_headers", diff --git a/source/common/rate_limiter_impl.cc b/source/common/rate_limiter_impl.cc index 89ef39af3..8c8e13d13 100644 --- a/source/common/rate_limiter_impl.cc +++ b/source/common/rate_limiter_impl.cc @@ -56,7 +56,7 @@ LinearRateLimiter::LinearRateLimiter(Envoy::TimeSource& time_source, const Frequ : RateLimiterBaseImpl(time_source), acquireable_count_(0), acquired_count_(0), frequency_(frequency) { if (frequency.value() <= 0) { - throw NighthawkException("Frequency must be > 0"); + throw NighthawkException(fmt::format("frequency must be <= 0, value: {}", frequency.value())); } } @@ -78,6 +78,45 @@ void LinearRateLimiter::releaseOne() { acquired_count_--; } +LinearRampingRateLimiterImpl::LinearRampingRateLimiterImpl(Envoy::TimeSource& time_source, + const std::chrono::nanoseconds ramp_time, + const Frequency frequency) + : RateLimiterBaseImpl(time_source), ramp_time_(ramp_time), frequency_(frequency) { + if (frequency_.value() <= 0) { + throw NighthawkException(fmt::format("frequency must be > 0, value: {}", frequency.value())); + } + if (ramp_time <= 0ns) { + throw NighthawkException( + fmt::format("ramp_time must be positive, value: {}", ramp_time.count())); + } +} + +bool LinearRampingRateLimiterImpl::tryAcquireOne() { + if (acquireable_count_) { + acquired_count_++; + return acquireable_count_--; + } + + const std::chrono::nanoseconds elapsed_time = elapsed(); + double elapsed_fraction = 1.0; + if (elapsed_time < ramp_time_) { + elapsed_fraction -= static_cast(ramp_time_.count() - elapsed_time.count()) / + static_cast(ramp_time_.count()); + } + const double current_frequency = elapsed_fraction * frequency_.value(); + // If we'd be at a constant pace, we can expect elapsed seconds * frequency requests. + // However, as we are linearly ramping, we can expect half of that, hence we + // divide by two. + const int64_t total = std::round((elapsed_time.count() / 1e9) * current_frequency / 2.0); + acquireable_count_ = total - acquired_count_; + return acquireable_count_ > 0 ? tryAcquireOne() : false; +} + +void LinearRampingRateLimiterImpl::releaseOne() { + acquireable_count_++; + acquired_count_--; +} + DelegatingRateLimiterImpl::DelegatingRateLimiterImpl( RateLimiterPtr&& rate_limiter, RateLimiterDelegate random_distribution_generator) : ForwardingRateLimiterImpl(std::move(rate_limiter)), @@ -118,4 +157,38 @@ bool FilteringRateLimiterImpl::tryAcquireOne() { return rate_limiter_->tryAcquireOne() ? filter_() : false; } +GraduallyOpeningRateLimiterFilter::GraduallyOpeningRateLimiterFilter( + const std::chrono::nanoseconds ramp_time, DiscreteNumericDistributionSamplerPtr&& provider, + RateLimiterPtr&& rate_limiter) + : FilteringRateLimiterImpl( + std::move(rate_limiter), + [this]() { + const auto elapsed_time = elapsed(); + if (elapsed_time < ramp_time_) { + // We want to linearly increase the probability of returning true + // below. We can derive that from the elapsed fraction of ramp_time. + const double probability = + 1.0 - static_cast(ramp_time_.count() - elapsed_time.count()) / + (ramp_time_.count() * 1.0); + // Get a random number r, where 0 < r ≤ 1. + const double random_between_0_and_1 = 1.0 * provider_->getValue() / provider_->max(); + // Given a uniform distribution, the fraction of the ramp + // will translate into the probability of opening up we are looking for. + return random_between_0_and_1 < probability; + } + // Ramping is complete, and as such this filter has completely opened up. + return true; + }), + provider_(std::move(provider)), ramp_time_(ramp_time) { + if (ramp_time <= 0ns) { + throw NighthawkException("ramp_time must be positive and > 0ns"); + } + if (provider_->min() != 1) { + throw NighthawkException("min value of the distribution provider must equal 1"); + } + if (provider_->max() != 1000000) { + throw NighthawkException("max value of the distribution provider must equal 1000000"); + } +} + } // namespace Nighthawk \ No newline at end of file diff --git a/source/common/rate_limiter_impl.h b/source/common/rate_limiter_impl.h index 609507724..71833237a 100644 --- a/source/common/rate_limiter_impl.h +++ b/source/common/rate_limiter_impl.h @@ -10,6 +10,7 @@ #include "common/frequency.h" +#include "absl/random/random.h" #include "absl/types/optional.h" namespace Nighthawk { @@ -55,6 +56,24 @@ class LinearRateLimiter : public RateLimiterBaseImpl, const Frequency frequency_; }; +/** + * A rate limiter which linearly ramps up to the desired frequency over the specified ramp_time. + */ +class LinearRampingRateLimiterImpl : public RateLimiterBaseImpl, + public Envoy::Logger::Loggable { +public: + LinearRampingRateLimiterImpl(Envoy::TimeSource& time_source, + const std::chrono::nanoseconds ramp_time, const Frequency frequency); + bool tryAcquireOne() override; + void releaseOne() override; + +private: + int64_t acquireable_count_{0}; + uint64_t acquired_count_{0}; + const std::chrono::nanoseconds ramp_time_; + const Frequency frequency_; +}; + /** * Base for a rate limiter which wraps another rate limiter, and forwards * some calls. @@ -123,6 +142,8 @@ class UniformRandomDistributionSamplerImpl : public DiscreteNumericDistributionS UniformRandomDistributionSamplerImpl(const uint64_t upper_bound) : distribution_(0, upper_bound) {} uint64_t getValue() override { return distribution_(generator_); } + uint64_t min() const override { return distribution_.min(); } + uint64_t max() const override { return distribution_.max(); } private: std::default_random_engine generator_; @@ -157,4 +178,29 @@ class FilteringRateLimiterImpl : public ForwardingRateLimiterImpl, const RateLimiterFilter filter_; }; +/** + * Takes a probabilistic approach to suppressing an arbitrary wrapped rate limiter. + */ +class GraduallyOpeningRateLimiterFilter : public FilteringRateLimiterImpl { +public: + /** + * @param ramp_time Time that should elapse between moving from complete + * suppression to completely opening the wrapped rate limiter. + * @param provider Distrete numeric distribution sampler. To achieve a + * reasonable precision, the min value of this distribution MUST equal 1. The max value MUST equal + * 1000000. Configuring otherwise will result in a NighthawkException. Using a uniform + * distribution will yield an approximately linear ramp from completely closed to completely + * opened. + * @param rate_limiter The rate limiter that will be wrapped and responsible + * for generating the base acquisition pacing that we will operate on. + */ + GraduallyOpeningRateLimiterFilter(const std::chrono::nanoseconds ramp_time, + DiscreteNumericDistributionSamplerPtr&& provider, + RateLimiterPtr&& rate_limiter); + +private: + DiscreteNumericDistributionSamplerPtr provider_; + const std::chrono::nanoseconds ramp_time_; +}; + } // namespace Nighthawk \ No newline at end of file diff --git a/test/rate_limiter_test.cc b/test/rate_limiter_test.cc index 018939998..db03c4ed3 100644 --- a/test/rate_limiter_test.cc +++ b/test/rate_limiter_test.cc @@ -172,4 +172,172 @@ TEST_F(RateLimiterTest, DistributionSamplingRateLimiterImplSchedulingTest) { EXPECT_TRUE(rate_limiter->tryAcquireOne()); } +class LinearRampingRateLimiterImplTest : public Test { +public: + /** + * @param frequency The final frequency of the ramp. + * @param duration The test (and ramp) duration. Frequency will be 0 Hz at the start and + * linearly increase as time moves forward, up to the specified frequency. + * @return std::vector an array containing the acquisition timings + * in microseconds. + */ + std::vector checkAcquisitionTimings(const Frequency frequency, + const std::chrono::seconds duration) { + Envoy::Event::SimulatedTimeSystem time_system; + std::vector acquisition_timings; + std::vector control_timings; + + LinearRampingRateLimiterImpl rate_limiter(time_system, duration, frequency); + auto total_us_elapsed = 0us; + const auto clock_tick = 10us; + EXPECT_FALSE(rate_limiter.tryAcquireOne()); + do { + if (rate_limiter.tryAcquireOne()) { + EXPECT_FALSE(rate_limiter.tryAcquireOne()); + acquisition_timings.push_back(total_us_elapsed.count()); + } + // We use the second law of motion to verify results: ½ * a * t² + // In this formula, 'a' equates to our ramp speed, and t to elapsed time. + double t = total_us_elapsed.count() / 1e6; + double a = (frequency.value() / (duration.count() * 1.0)); + // Finally, figure out the ground that we can expect to be covered. + uint64_t expected_count = std::round(0.5 * a * t * t); + if (expected_count > control_timings.size()) { + control_timings.push_back(total_us_elapsed.count()); + } + time_system.sleep(clock_tick); + total_us_elapsed += clock_tick; + } while (total_us_elapsed <= duration); + + // For good measure, verify we saw the expected amount of acquisitions: half + // of "frequency times duration". + EXPECT_EQ(std::round(duration.count() * frequency.value() / 2.0), acquisition_timings.size()); + // Sanity check that we have the right number of control timings. + EXPECT_EQ(control_timings.size(), acquisition_timings.size()); + // Verify that all timings are correct. + for (uint64_t i = 0; i < acquisition_timings.size(); i++) { + // We allow one clock tick of slack in timing expectations, as floating + // point math may introduce small errors in some cases. + // This is a test only issue: in practice we don't have a fixed microsecond-level step sizes, + // and the rate limiter computes at nanosecond precision internally. As we want to have + // microsecond level precision, this should be more then sufficient. + EXPECT_NEAR(acquisition_timings[i], control_timings[i], clock_tick.count()); + } + return acquisition_timings; + } +}; + +TEST_F(RateLimiterTest, LinearRampingRateLimiterImplInvalidArgumentTest) { + Envoy::Event::SimulatedTimeSystem time_system; + // bad frequency + EXPECT_THROW(LinearRampingRateLimiterImpl rate_limiter(time_system, 1s, 0_Hz); + , NighthawkException); + // bad ramp duration + EXPECT_THROW(LinearRampingRateLimiterImpl rate_limiter(time_system, 0s, 1_Hz); + , NighthawkException); + EXPECT_THROW(LinearRampingRateLimiterImpl rate_limiter(time_system, -1s, 1_Hz); + , NighthawkException); +} + +TEST_F(LinearRampingRateLimiterImplTest, TimingVerificationTest) { + EXPECT_EQ(checkAcquisitionTimings(5_Hz, 5s), + std::vector({1000010, 1732060, 2236070, 2645760, 3000000, 3316630, 3605560, + 3872990, 4123110, 4358900, 4582580, 4795840, 5000000})); + checkAcquisitionTimings(1_Hz, 3s); + checkAcquisitionTimings(5_Hz, 3s); + checkAcquisitionTimings(4_Hz, 2s); + checkAcquisitionTimings(1000_Hz, 12s); + checkAcquisitionTimings(40000_Hz, 7s); +} + +TEST_F(RateLimiterTest, GraduallyOpeningRateLimiterFilterInvalidArgumentTest) { + // Negative ramp throws. + EXPECT_THROW(GraduallyOpeningRateLimiterFilter gorl( + -1s, std::make_unique>(), + std::make_unique>()); + , NighthawkException); + + // zero ramp throws. + EXPECT_THROW(GraduallyOpeningRateLimiterFilter gorl( + 0s, std::make_unique>(), + std::make_unique>()); + , NighthawkException); + + // Pass in a badly configured distribution sampler. + auto bad_distribution_sampler = std::make_unique(); + EXPECT_CALL(*bad_distribution_sampler, min).Times(1).WillOnce(Return(0)); + EXPECT_THROW( + GraduallyOpeningRateLimiterFilter gorl(1s, std::move(bad_distribution_sampler), + std::make_unique>()); + , NighthawkException); + + bad_distribution_sampler = std::make_unique(); + // Correct min, but now introduce a bad max. + EXPECT_CALL(*bad_distribution_sampler, min).Times(1).WillOnce(Return(1)); + EXPECT_CALL(*bad_distribution_sampler, max).Times(1).WillOnce(Return(99)); + EXPECT_THROW( + GraduallyOpeningRateLimiterFilter gorl(1s, std::move(bad_distribution_sampler), + std::make_unique>()); + , NighthawkException); +} + +class GraduallyOpeningRateLimiterFilterTest : public Test { +public: + std::vector getAcquisitionTimings(const Frequency frequency, + const std::chrono::seconds duration) { + Envoy::Event::SimulatedTimeSystem time_system; + std::vector acquisition_timings; + auto* unsafe_discrete_numeric_distribution_sampler = + new MockDiscreteNumericDistributionSampler(); + std::mt19937_64 mt(1243); + const uint64_t dist_min = 1; + const uint64_t dist_max = 1000000; + std::uniform_int_distribution dist(dist_min, dist_max); + EXPECT_CALL(*unsafe_discrete_numeric_distribution_sampler, getValue) + .Times(AtLeast(1)) + .WillRepeatedly(Invoke([&dist, &mt]() { return dist(mt); })); + EXPECT_CALL(*unsafe_discrete_numeric_distribution_sampler, min) + .Times(1) + .WillOnce(Return(dist_min)); + EXPECT_CALL(*unsafe_discrete_numeric_distribution_sampler, max) + .Times(AtLeast(1)) + .WillRepeatedly(Return(dist_max)); + RateLimiterPtr rate_limiter = std::make_unique( + duration, + std::unique_ptr( + unsafe_discrete_numeric_distribution_sampler), + std::make_unique(time_system, frequency)); + auto total_ms_elapsed = 0ms; + auto clock_tick = 1ms; + EXPECT_FALSE(rate_limiter->tryAcquireOne()); + + do { + if (rate_limiter->tryAcquireOne()) { + acquisition_timings.push_back(total_ms_elapsed.count()); + EXPECT_FALSE(rate_limiter->tryAcquireOne()); + } + time_system.sleep(clock_tick); + total_ms_elapsed += clock_tick; + } while (total_ms_elapsed <= duration); + + EXPECT_FALSE(rate_limiter->tryAcquireOne()); + time_system.sleep(1s); + // Verify that after the rampup the expected constant pacing is maintained. + // Calls should be forwarded to the regular linear rate limiter algorithm with its + // corrective behavior so we can expect to acquire a series with that. + for (uint64_t i = 0; i < frequency.value(); i++) { + EXPECT_TRUE(rate_limiter->tryAcquireOne()); + } + // Verify we acquired everything. + EXPECT_FALSE(rate_limiter->tryAcquireOne()); + return acquisition_timings; + } +}; + +TEST_F(GraduallyOpeningRateLimiterFilterTest, TimingVerificationTest) { + EXPECT_EQ(getAcquisitionTimings(50_Hz, 1s), + std::vector({120, 320, 380, 560, 580, 600, 620, 640, 660, 680, 700, 740, + 760, 780, 840, 860, 880, 900, 920, 940, 960, 980, 1000})); +} + } // namespace Nighthawk