-
Notifications
You must be signed in to change notification settings - Fork 89
Linear ramping and probabilistic ramping #218
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
90bd582
bf5ae1d
28011ef
be93d1a
76158b9
d29c95c
ebd857c
1d62a00
c86cb92
7fc2992
9def3cd
abda9db
4c54837
7883d59
e458ea5
e7b65c1
a8c94d8
cc4fead
ea43285
4e7eb0a
a69d811
b5b5fd2
fb85732
8a268fe
34f649b
54aa937
f34708f
58a7566
1f90162
e1fa16d
af45a0a
5c8637c
89557e8
79b641c
bec3b95
fe7bb0c
324a4ee
6329377
5cfa171
31e4cbe
9d248f2
183677c
d79f2a2
f480248
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -56,7 +56,7 @@ LinearRateLimiter::LinearRateLimiter(Envoy::TimeSource& time_source, const Frequ | |
| : RateLimiterBaseImpl(time_source), acquireable_count_(0), acquired_count_(0), | ||
| frequency_(frequency) { | ||
| if (frequency.value() <= 0) { | ||
| throw NighthawkException("Frequency must be > 0"); | ||
| throw NighthawkException(fmt::format("frequency must be <= 0, value: {}", frequency.value())); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -78,6 +78,45 @@ void LinearRateLimiter::releaseOne() { | |
| acquired_count_--; | ||
| } | ||
|
|
||
| LinearRampingRateLimiterImpl::LinearRampingRateLimiterImpl(Envoy::TimeSource& time_source, | ||
| const std::chrono::nanoseconds ramp_time, | ||
| const Frequency frequency) | ||
| : RateLimiterBaseImpl(time_source), ramp_time_(ramp_time), frequency_(frequency) { | ||
| if (frequency_.value() <= 0) { | ||
| throw NighthawkException(fmt::format("frequency must be > 0, value: {}", frequency.value())); | ||
| } | ||
| if (ramp_time <= 0ns) { | ||
| throw NighthawkException( | ||
| fmt::format("ramp_time must be positive, value: {}", ramp_time.count())); | ||
| } | ||
| } | ||
|
|
||
| bool LinearRampingRateLimiterImpl::tryAcquireOne() { | ||
| if (acquireable_count_) { | ||
| acquired_count_++; | ||
| return acquireable_count_--; | ||
| } | ||
|
|
||
| const std::chrono::nanoseconds elapsed_time = elapsed(); | ||
| double elapsed_fraction = 1.0; | ||
| if (elapsed_time < ramp_time_) { | ||
| elapsed_fraction -= static_cast<double>(ramp_time_.count() - elapsed_time.count()) / | ||
| static_cast<double>(ramp_time_.count()); | ||
| } | ||
| const double current_frequency = elapsed_fraction * frequency_.value(); | ||
| // If we'd be at a constant pace, we can expect elapsed seconds * frequency requests. | ||
| // However, as we are linearly ramping, we can expect half of that, hence we | ||
| // divide by two. | ||
| const int64_t total = std::round((elapsed_time.count() / 1e9) * current_frequency / 2.0); | ||
| acquireable_count_ = total - acquired_count_; | ||
| return acquireable_count_ > 0 ? tryAcquireOne() : false; | ||
| } | ||
|
|
||
| void LinearRampingRateLimiterImpl::releaseOne() { | ||
| acquireable_count_++; | ||
| acquired_count_--; | ||
| } | ||
|
|
||
| DelegatingRateLimiterImpl::DelegatingRateLimiterImpl( | ||
| RateLimiterPtr&& rate_limiter, RateLimiterDelegate random_distribution_generator) | ||
| : ForwardingRateLimiterImpl(std::move(rate_limiter)), | ||
|
|
@@ -118,4 +157,38 @@ bool FilteringRateLimiterImpl::tryAcquireOne() { | |
| return rate_limiter_->tryAcquireOne() ? filter_() : false; | ||
| } | ||
|
|
||
| GraduallyOpeningRateLimiterFilter::GraduallyOpeningRateLimiterFilter( | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (fyi) Now that I read the code and understand the implementation here, I have to say that this is truly beautiful, including your use of class inheritance for the ForwardingRateLimiter. I have learned something today and I appreciate it, thanks.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That is nice to hear, thanks! |
||
| const std::chrono::nanoseconds ramp_time, DiscreteNumericDistributionSamplerPtr&& provider, | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we validate some of the arguments? |
||
| RateLimiterPtr&& rate_limiter) | ||
| : FilteringRateLimiterImpl( | ||
| std::move(rate_limiter), | ||
| [this]() { | ||
| const auto elapsed_time = elapsed(); | ||
| if (elapsed_time < ramp_time_) { | ||
| // We want to linearly increase the probability of returning true | ||
| // below. We can derive that from the elapsed fraction of ramp_time. | ||
| const double probability = | ||
| 1.0 - static_cast<double>(ramp_time_.count() - elapsed_time.count()) / | ||
| (ramp_time_.count() * 1.0); | ||
| // Get a random number r, where 0 < r ≤ 1. | ||
| const double random_between_0_and_1 = 1.0 * provider_->getValue() / provider_->max(); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you, this is easier to follow with the new comments and the min() and max() methods. |
||
| // Given a uniform distribution, the fraction of the ramp | ||
| // will translate into the probability of opening up we are looking for. | ||
| return random_between_0_and_1 < probability; | ||
| } | ||
| // Ramping is complete, and as such this filter has completely opened up. | ||
| return true; | ||
| }), | ||
| provider_(std::move(provider)), ramp_time_(ramp_time) { | ||
| if (ramp_time <= 0ns) { | ||
| throw NighthawkException("ramp_time must be positive and > 0ns"); | ||
| } | ||
| if (provider_->min() != 1) { | ||
| throw NighthawkException("min value of the distribution provider must equal 1"); | ||
| } | ||
| if (provider_->max() != 1000000) { | ||
| throw NighthawkException("max value of the distribution provider must equal 1000000"); | ||
| } | ||
| } | ||
|
|
||
| } // namespace Nighthawk | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -10,6 +10,7 @@ | |
|
|
||
| #include "common/frequency.h" | ||
|
|
||
| #include "absl/random/random.h" | ||
| #include "absl/types/optional.h" | ||
|
|
||
| namespace Nighthawk { | ||
|
|
@@ -55,6 +56,24 @@ class LinearRateLimiter : public RateLimiterBaseImpl, | |
| const Frequency frequency_; | ||
| }; | ||
|
|
||
| /** | ||
| * A rate limiter which linearly ramps up to the desired frequency over the specified ramp_time. | ||
| */ | ||
| class LinearRampingRateLimiterImpl : public RateLimiterBaseImpl, | ||
| public Envoy::Logger::Loggable<Envoy::Logger::Id::main> { | ||
| public: | ||
| LinearRampingRateLimiterImpl(Envoy::TimeSource& time_source, | ||
| const std::chrono::nanoseconds ramp_time, const Frequency frequency); | ||
| bool tryAcquireOne() override; | ||
| void releaseOne() override; | ||
|
|
||
| private: | ||
| int64_t acquireable_count_{0}; | ||
| uint64_t acquired_count_{0}; | ||
| const std::chrono::nanoseconds ramp_time_; | ||
| const Frequency frequency_; | ||
| }; | ||
|
|
||
| /** | ||
| * Base for a rate limiter which wraps another rate limiter, and forwards | ||
| * some calls. | ||
|
|
@@ -123,6 +142,8 @@ class UniformRandomDistributionSamplerImpl : public DiscreteNumericDistributionS | |
| UniformRandomDistributionSamplerImpl(const uint64_t upper_bound) | ||
| : distribution_(0, upper_bound) {} | ||
| uint64_t getValue() override { return distribution_(generator_); } | ||
| uint64_t min() const override { return distribution_.min(); } | ||
| uint64_t max() const override { return distribution_.max(); } | ||
|
|
||
| private: | ||
| std::default_random_engine generator_; | ||
|
|
@@ -157,4 +178,29 @@ class FilteringRateLimiterImpl : public ForwardingRateLimiterImpl, | |
| const RateLimiterFilter filter_; | ||
| }; | ||
|
|
||
| /** | ||
| * Takes a probabilistic approach to suppressing an arbitrary wrapped rate limiter. | ||
| */ | ||
| class GraduallyOpeningRateLimiterFilter : public FilteringRateLimiterImpl { | ||
| public: | ||
| /** | ||
| * @param ramp_time Time that should elapse between moving from complete | ||
| * suppression to completely opening the wrapped rate limiter. | ||
| * @param provider Distrete numeric distribution sampler. To achieve a | ||
| * reasonable precision, the min value of this distribution MUST equal 1. The max value MUST equal | ||
| * 1000000. Configuring otherwise will result in a NighthawkException. Using a uniform | ||
| * distribution will yield an approximately linear ramp from completely closed to completely | ||
| * opened. | ||
| * @param rate_limiter The rate limiter that will be wrapped and responsible | ||
| * for generating the base acquisition pacing that we will operate on. | ||
| */ | ||
| GraduallyOpeningRateLimiterFilter(const std::chrono::nanoseconds ramp_time, | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we add some description of the arguments and how they affect the functionality?
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe this comment still applies after the latest changes. Maybe it got lost? |
||
| DiscreteNumericDistributionSamplerPtr&& provider, | ||
| RateLimiterPtr&& rate_limiter); | ||
|
|
||
| private: | ||
| DiscreteNumericDistributionSamplerPtr provider_; | ||
| const std::chrono::nanoseconds ramp_time_; | ||
| }; | ||
|
|
||
| } // namespace Nighthawk | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -172,4 +172,172 @@ TEST_F(RateLimiterTest, DistributionSamplingRateLimiterImplSchedulingTest) { | |
| EXPECT_TRUE(rate_limiter->tryAcquireOne()); | ||
| } | ||
|
|
||
| class LinearRampingRateLimiterImplTest : public Test { | ||
| public: | ||
| /** | ||
| * @param frequency The final frequency of the ramp. | ||
| * @param duration The test (and ramp) duration. Frequency will be 0 Hz at the start and | ||
| * linearly increase as time moves forward, up to the specified frequency. | ||
| * @return std::vector<uint64_t> an array containing the acquisition timings | ||
| * in microseconds. | ||
| */ | ||
| std::vector<int64_t> checkAcquisitionTimings(const Frequency frequency, | ||
| const std::chrono::seconds duration) { | ||
| Envoy::Event::SimulatedTimeSystem time_system; | ||
| std::vector<int64_t> acquisition_timings; | ||
| std::vector<int64_t> control_timings; | ||
|
|
||
| LinearRampingRateLimiterImpl rate_limiter(time_system, duration, frequency); | ||
| auto total_us_elapsed = 0us; | ||
| const auto clock_tick = 10us; | ||
| EXPECT_FALSE(rate_limiter.tryAcquireOne()); | ||
| do { | ||
| if (rate_limiter.tryAcquireOne()) { | ||
| EXPECT_FALSE(rate_limiter.tryAcquireOne()); | ||
| acquisition_timings.push_back(total_us_elapsed.count()); | ||
| } | ||
| // We use the second law of motion to verify results: ½ * a * t² | ||
| // In this formula, 'a' equates to our ramp speed, and t to elapsed time. | ||
| double t = total_us_elapsed.count() / 1e6; | ||
| double a = (frequency.value() / (duration.count() * 1.0)); | ||
| // Finally, figure out the ground that we can expect to be covered. | ||
| uint64_t expected_count = std::round(0.5 * a * t * t); | ||
| if (expected_count > control_timings.size()) { | ||
| control_timings.push_back(total_us_elapsed.count()); | ||
| } | ||
| time_system.sleep(clock_tick); | ||
| total_us_elapsed += clock_tick; | ||
| } while (total_us_elapsed <= duration); | ||
|
|
||
| // For good measure, verify we saw the expected amount of acquisitions: half | ||
| // of "frequency times duration". | ||
| EXPECT_EQ(std::round(duration.count() * frequency.value() / 2.0), acquisition_timings.size()); | ||
| // Sanity check that we have the right number of control timings. | ||
| EXPECT_EQ(control_timings.size(), acquisition_timings.size()); | ||
| // Verify that all timings are correct. | ||
| for (uint64_t i = 0; i < acquisition_timings.size(); i++) { | ||
| // We allow one clock tick of slack in timing expectations, as floating | ||
| // point math may introduce small errors in some cases. | ||
| // This is a test only issue: in practice we don't have a fixed microsecond-level step sizes, | ||
| // and the rate limiter computes at nanosecond precision internally. As we want to have | ||
| // microsecond level precision, this should be more then sufficient. | ||
| EXPECT_NEAR(acquisition_timings[i], control_timings[i], clock_tick.count()); | ||
| } | ||
| return acquisition_timings; | ||
| } | ||
| }; | ||
|
|
||
| TEST_F(RateLimiterTest, LinearRampingRateLimiterImplInvalidArgumentTest) { | ||
| Envoy::Event::SimulatedTimeSystem time_system; | ||
| // bad frequency | ||
| EXPECT_THROW(LinearRampingRateLimiterImpl rate_limiter(time_system, 1s, 0_Hz); | ||
| , NighthawkException); | ||
| // bad ramp duration | ||
| EXPECT_THROW(LinearRampingRateLimiterImpl rate_limiter(time_system, 0s, 1_Hz); | ||
| , NighthawkException); | ||
| EXPECT_THROW(LinearRampingRateLimiterImpl rate_limiter(time_system, -1s, 1_Hz); | ||
| , NighthawkException); | ||
| } | ||
|
|
||
| TEST_F(LinearRampingRateLimiterImplTest, TimingVerificationTest) { | ||
| EXPECT_EQ(checkAcquisitionTimings(5_Hz, 5s), | ||
| std::vector<int64_t>({1000010, 1732060, 2236070, 2645760, 3000000, 3316630, 3605560, | ||
| 3872990, 4123110, 4358900, 4582580, 4795840, 5000000})); | ||
| checkAcquisitionTimings(1_Hz, 3s); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is indeed much better for readability, but i am debating with myself whether it doesn't decrease our coverage. Could there later be a bug that would go undetected? Should we add at least one test case that verifies the exact values as before? WDYT?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah this makes sense, done (f480248) |
||
| checkAcquisitionTimings(5_Hz, 3s); | ||
| checkAcquisitionTimings(4_Hz, 2s); | ||
| checkAcquisitionTimings(1000_Hz, 12s); | ||
| checkAcquisitionTimings(40000_Hz, 7s); | ||
| } | ||
|
|
||
| TEST_F(RateLimiterTest, GraduallyOpeningRateLimiterFilterInvalidArgumentTest) { | ||
| // Negative ramp throws. | ||
| EXPECT_THROW(GraduallyOpeningRateLimiterFilter gorl( | ||
| -1s, std::make_unique<NiceMock<MockDiscreteNumericDistributionSampler>>(), | ||
| std::make_unique<NiceMock<MockRateLimiter>>()); | ||
| , NighthawkException); | ||
|
|
||
| // zero ramp throws. | ||
| EXPECT_THROW(GraduallyOpeningRateLimiterFilter gorl( | ||
| 0s, std::make_unique<NiceMock<MockDiscreteNumericDistributionSampler>>(), | ||
| std::make_unique<NiceMock<MockRateLimiter>>()); | ||
| , NighthawkException); | ||
|
|
||
| // Pass in a badly configured distribution sampler. | ||
| auto bad_distribution_sampler = std::make_unique<MockDiscreteNumericDistributionSampler>(); | ||
| EXPECT_CALL(*bad_distribution_sampler, min).Times(1).WillOnce(Return(0)); | ||
| EXPECT_THROW( | ||
| GraduallyOpeningRateLimiterFilter gorl(1s, std::move(bad_distribution_sampler), | ||
| std::make_unique<NiceMock<MockRateLimiter>>()); | ||
| , NighthawkException); | ||
|
|
||
| bad_distribution_sampler = std::make_unique<MockDiscreteNumericDistributionSampler>(); | ||
| // Correct min, but now introduce a bad max. | ||
| EXPECT_CALL(*bad_distribution_sampler, min).Times(1).WillOnce(Return(1)); | ||
| EXPECT_CALL(*bad_distribution_sampler, max).Times(1).WillOnce(Return(99)); | ||
| EXPECT_THROW( | ||
| GraduallyOpeningRateLimiterFilter gorl(1s, std::move(bad_distribution_sampler), | ||
| std::make_unique<NiceMock<MockRateLimiter>>()); | ||
| , NighthawkException); | ||
| } | ||
|
|
||
| class GraduallyOpeningRateLimiterFilterTest : public Test { | ||
| public: | ||
| std::vector<int64_t> getAcquisitionTimings(const Frequency frequency, | ||
| const std::chrono::seconds duration) { | ||
| Envoy::Event::SimulatedTimeSystem time_system; | ||
| std::vector<int64_t> acquisition_timings; | ||
| auto* unsafe_discrete_numeric_distribution_sampler = | ||
| new MockDiscreteNumericDistributionSampler(); | ||
| std::mt19937_64 mt(1243); | ||
| const uint64_t dist_min = 1; | ||
| const uint64_t dist_max = 1000000; | ||
| std::uniform_int_distribution<uint64_t> dist(dist_min, dist_max); | ||
| EXPECT_CALL(*unsafe_discrete_numeric_distribution_sampler, getValue) | ||
| .Times(AtLeast(1)) | ||
| .WillRepeatedly(Invoke([&dist, &mt]() { return dist(mt); })); | ||
| EXPECT_CALL(*unsafe_discrete_numeric_distribution_sampler, min) | ||
| .Times(1) | ||
| .WillOnce(Return(dist_min)); | ||
| EXPECT_CALL(*unsafe_discrete_numeric_distribution_sampler, max) | ||
| .Times(AtLeast(1)) | ||
| .WillRepeatedly(Return(dist_max)); | ||
| RateLimiterPtr rate_limiter = std::make_unique<GraduallyOpeningRateLimiterFilter>( | ||
| duration, | ||
| std::unique_ptr<DiscreteNumericDistributionSampler>( | ||
| unsafe_discrete_numeric_distribution_sampler), | ||
| std::make_unique<LinearRateLimiter>(time_system, frequency)); | ||
| auto total_ms_elapsed = 0ms; | ||
| auto clock_tick = 1ms; | ||
| EXPECT_FALSE(rate_limiter->tryAcquireOne()); | ||
|
|
||
| do { | ||
| if (rate_limiter->tryAcquireOne()) { | ||
| acquisition_timings.push_back(total_ms_elapsed.count()); | ||
| EXPECT_FALSE(rate_limiter->tryAcquireOne()); | ||
| } | ||
| time_system.sleep(clock_tick); | ||
| total_ms_elapsed += clock_tick; | ||
| } while (total_ms_elapsed <= duration); | ||
|
|
||
| EXPECT_FALSE(rate_limiter->tryAcquireOne()); | ||
| time_system.sleep(1s); | ||
| // Verify that after the rampup the expected constant pacing is maintained. | ||
| // Calls should be forwarded to the regular linear rate limiter algorithm with its | ||
| // corrective behavior so we can expect to acquire a series with that. | ||
| for (uint64_t i = 0; i < frequency.value(); i++) { | ||
| EXPECT_TRUE(rate_limiter->tryAcquireOne()); | ||
| } | ||
| // Verify we acquired everything. | ||
| EXPECT_FALSE(rate_limiter->tryAcquireOne()); | ||
| return acquisition_timings; | ||
| } | ||
| }; | ||
|
|
||
| TEST_F(GraduallyOpeningRateLimiterFilterTest, TimingVerificationTest) { | ||
| EXPECT_EQ(getAcquisitionTimings(50_Hz, 1s), | ||
| std::vector<int64_t>({120, 320, 380, 560, 580, 600, 620, 640, 660, 680, 700, 740, | ||
| 760, 780, 840, 860, 880, 900, 920, 940, 960, 980, 1000})); | ||
| } | ||
|
|
||
| } // namespace Nighthawk | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we document the new methods?
(optional / unrelated) Since we are here, can we document the entire interface?