Skip to content
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
90bd582
save state on ramping rate limiter
oschaaf Nov 16, 2019
bf5ae1d
Merge remote-tracking branch 'upstream/master' into ramping-linear-ra…
oschaaf Nov 25, 2019
28011ef
Merge remote-tracking branch 'upstream/master' into ramping-linear-ra…
oschaaf Nov 27, 2019
be93d1a
Merge remote-tracking branch 'upstream/master' into ramping-linear-ra…
oschaaf Nov 27, 2019
76158b9
Add tests, clean up
oschaaf Nov 29, 2019
d29c95c
Refactor + LinearlyOpeningRateLimiterFilter
oschaaf Nov 29, 2019
ebd857c
save state
oschaaf Nov 30, 2019
1d62a00
save state
oschaaf Dec 2, 2019
c86cb92
Back out change to constness in Frequency
oschaaf Dec 2, 2019
7fc2992
Save state, wire in zipf / foo ZipfRateLimiter
oschaaf Dec 2, 2019
9def3cd
Some comments & tidying up
oschaaf Dec 2, 2019
abda9db
Merge remote-tracking branch 'upstream/master' into ramping-linear-ra…
oschaaf Dec 3, 2019
4c54837
small cleanup
oschaaf Dec 3, 2019
7883d59
Update Envoy dep for access to enableHRTimer()
oschaaf Dec 3, 2019
e458ea5
Whoops, amend bad copying
oschaaf Dec 3, 2019
e7b65c1
Also, sync .bazelrc while at it
oschaaf Dec 3, 2019
a8c94d8
Merge remote-tracking branch 'upstream/master' into ramping-linear-ra…
oschaaf Dec 6, 2019
cc4fead
Review feedback
oschaaf Dec 11, 2019
ea43285
Merge remote-tracking branch 'upstream/master' into ramping-linear-ra…
oschaaf Dec 11, 2019
4e7eb0a
Merge remote-tracking branch 'upstream/master' into ramping-linear-ra…
oschaaf Dec 12, 2019
a69d811
Clean up the diff
oschaaf Dec 12, 2019
b5b5fd2
Merge remote-tracking branch 'upstream/master' into update-envoy-dep-8
oschaaf Dec 12, 2019
fb85732
Update Envoy to the latest
oschaaf Dec 12, 2019
8a268fe
Move to the sha that has our target
oschaaf Dec 12, 2019
34f649b
Unbreak it
oschaaf Dec 13, 2019
54aa937
Merge remote-tracking branch 'origin/update-envoy-dep-8' into ramping…
oschaaf Dec 13, 2019
f34708f
Fix a TODO
oschaaf Dec 13, 2019
58a7566
Fix accidental comment
oschaaf Dec 13, 2019
1f90162
Fix clang-tidy issue
oschaaf Dec 13, 2019
e1fa16d
Merge remote-tracking branch 'upstream/master' into ramping-linear-ra…
oschaaf Dec 13, 2019
af45a0a
Zipf: expose q and v arguments of the distribution
oschaaf Dec 13, 2019
5c8637c
Zipf: expose q and v arguments of the distribution
oschaaf Dec 13, 2019
89557e8
Merge branch 'ramping-linear-rate-limiter' of github.com:oschaaf/nigh…
oschaaf Dec 13, 2019
79b641c
Merge remote-tracking branch 'upstream/master' into ramping-linear-ra…
oschaaf Dec 16, 2019
bec3b95
Merge remote-tracking branch 'upstream/master' into ramping-linear-ra…
oschaaf Dec 19, 2019
fe7bb0c
Merge remote-tracking branch 'upstream/master' into ramping-linear-ra…
oschaaf Dec 23, 2019
324a4ee
Remove zipf stuff
oschaaf Dec 23, 2019
6329377
Review feedback
oschaaf Dec 23, 2019
5cfa171
Review: 1ns adjustment
oschaaf Dec 23, 2019
31e4cbe
Review: partially address comments
oschaaf Dec 25, 2019
9d248f2
Review-feedback: use second law of motion
oschaaf Dec 29, 2019
183677c
clang-tidy: fix complaint
oschaaf Dec 29, 2019
d79f2a2
Merge remote-tracking branch 'upstream/master' into ramping-linear-ra…
oschaaf Jan 3, 2020
f480248
Review feedback
oschaaf Jan 3, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions source/common/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ envoy_cc_library(
"//api/client:grpc_service_lib",
"//include/nighthawk/client:client_includes",
"//include/nighthawk/common:base_includes",
"@com_google_absl//absl/random",
"@dep_hdrhistogram_c//:hdrhistogram_c",
"@envoy//source/common/common:assert_lib_with_external_headers",
"@envoy//source/common/common:lock_guard_lib_with_external_headers",
Expand Down
60 changes: 60 additions & 0 deletions source/common/rate_limiter_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,43 @@ void LinearRateLimiter::releaseOne() {
acquired_count_--;
}

LinearRampingRateLimiterImpl::LinearRampingRateLimiterImpl(Envoy::TimeSource& time_source,
const std::chrono::nanoseconds ramp_time,
const Frequency frequency)
: RateLimiterBaseImpl(time_source), ramp_time_(ramp_time), frequency_(frequency) {
if (frequency_.value() <= 0) {
throw NighthawkException("frequency must be > 0");
Comment thread
mum4k marked this conversation as resolved.
Outdated
}
if (ramp_time <= 0ns) {
throw NighthawkException("duration must be positive");
}
}

bool LinearRampingRateLimiterImpl::tryAcquireOne() {
if (acquireable_count_) {
acquired_count_++;
return acquireable_count_--;
}
const auto elapsed_time = elapsed() + 1ns;
Comment thread
mum4k marked this conversation as resolved.
Outdated
Comment thread
mum4k marked this conversation as resolved.
Outdated
double elapsed_fraction = 1.0;
if (elapsed_time < ramp_time_) {
elapsed_fraction -= static_cast<double>(ramp_time_.count() - elapsed_time.count()) /
static_cast<double>(ramp_time_.count());
}

const double current_frequency = elapsed_fraction * (frequency_.value() * 1.0);
const double chrono_seconds =
std::chrono::duration_cast<std::chrono::duration<double>>(elapsed_time).count();
const double total = chrono_seconds * current_frequency / 2.0;
Comment thread
mum4k marked this conversation as resolved.
Outdated
acquireable_count_ = std::round(total) - acquired_count_;
return acquireable_count_ > 0 ? tryAcquireOne() : false;
}

void LinearRampingRateLimiterImpl::releaseOne() {
acquireable_count_++;
acquired_count_--;
}

DelegatingRateLimiterImpl::DelegatingRateLimiterImpl(
RateLimiterPtr&& rate_limiter, RateLimiterDelegate random_distribution_generator)
: ForwardingRateLimiterImpl(std::move(rate_limiter)),
Expand Down Expand Up @@ -118,4 +155,27 @@ bool FilteringRateLimiterImpl::tryAcquireOne() {
return rate_limiter_->tryAcquireOne() ? filter_() : false;
}

GraduallyOpeningRateLimiterFilter::GraduallyOpeningRateLimiterFilter(

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(fyi) Now that I read the code and understand the implementation here, I have to say that this is truly beautiful, including your use of class inheritance for the ForwardingRateLimiter.

I have learned something today and I appreciate it, thanks.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is nice to hear, thanks!

const std::chrono::nanoseconds ramp_time, DiscreteNumericDistributionSamplerPtr&& provider,

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we validate some of the arguments?

RateLimiterPtr&& rate_limiter)
: FilteringRateLimiterImpl(
std::move(rate_limiter),
[this]() {
if (elapsed() < ramp_time_) {
const double chance_percentage =
100.0 - (static_cast<double>(ramp_time_.count() - elapsed().count()) /
(ramp_time_.count() * 1.0)) *
100.0;
return std::round(provider_->getValue() / 10000.0) <= chance_percentage;

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am trying to understand the fixed division by 10k. It seems to imply some expectations about the discrete numeric distribution provider that don't seem to be specified on the API.

This may be related to my comment that we should add more documentation next to the constructor for the GraduallyOpeningRateLimiterFilter that will explain the arguments and their roles.

}
return true;
}),
provider_(std::move(provider)), ramp_time_(ramp_time) {}

ZipfRateLimiterImpl::ZipfRateLimiterImpl(RateLimiterPtr&& rate_limiter, bool deterministic,
double q, double v)
: FilteringRateLimiterImpl(std::move(rate_limiter),
[this]() { return deterministic_ ? dist_(mt_) : dist_(g_); }),
dist_(1, q, v), deterministic_(deterministic) {}

} // namespace Nighthawk
60 changes: 60 additions & 0 deletions source/common/rate_limiter_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

#include "common/frequency.h"

#include "absl/random/random.h"
#include "absl/random/zipf_distribution.h"
#include "absl/types/optional.h"

namespace Nighthawk {
Expand Down Expand Up @@ -55,6 +57,24 @@ class LinearRateLimiter : public RateLimiterBaseImpl,
const Frequency frequency_;
};

/**
* A rate limiter which linearly ramps up to the desired frequency over the specified period.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) Would it make sense to sync the terminology? The comment refers to "the specified period" while the argument is called ramp_time.

*/
class LinearRampingRateLimiterImpl : public RateLimiterBaseImpl,
public Envoy::Logger::Loggable<Envoy::Logger::Id::main> {
public:
LinearRampingRateLimiterImpl(Envoy::TimeSource& time_source,
const std::chrono::nanoseconds ramp_time, const Frequency frequency);
bool tryAcquireOne() override;
void releaseOne() override;

private:
int64_t acquireable_count_{0};
uint64_t acquired_count_{0};
const std::chrono::nanoseconds ramp_time_;
const Frequency frequency_;
};

/**
* Base for a rate limiter which wraps another rate limiter, and forwards
* some calls.
Expand Down Expand Up @@ -157,4 +177,44 @@ class FilteringRateLimiterImpl : public ForwardingRateLimiterImpl,
const RateLimiterFilter filter_;
};

/**
* Takes a probabilistic approach to suppressing an arbitrary wrapper rate limiter.
Comment thread
mum4k marked this conversation as resolved.
Outdated
*/
class GraduallyOpeningRateLimiterFilter : public FilteringRateLimiterImpl {
public:
GraduallyOpeningRateLimiterFilter(const std::chrono::nanoseconds ramp_time,

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some description of the arguments and how they affect the functionality?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this comment still applies after the latest changes. Maybe it got lost?

DiscreteNumericDistributionSamplerPtr&& provider,
RateLimiterPtr&& rate_limiter);

private:
DiscreteNumericDistributionSamplerPtr provider_;
const std::chrono::nanoseconds ramp_time_;
};

/**
* Thin wrapper around absl::zipf_distribution that will pull zeroes and ones from the distribution
* with the intent to probabilistically suppress the wrapped rate limiter.
* This may need further consideration, because it will shoot holes in the pacing, lowering the

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you have any pre-existing discussions on this with @htuch, is this going to be an issue considering the use cases of this rate limiter? What are the use cases of this rate limiter?

* actual achieved frequency.
*/
class ZipfRateLimiterImpl : public FilteringRateLimiterImpl {
Comment thread
mum4k marked this conversation as resolved.
Outdated
public:
/**
* From the absl header associated to the zipf distribution:

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also try to say what q and v are? Either something like "The parameters v and q determine the skew of the distribution." (also copied from the header).

Or copy the relevant portion from the zipf_distribution class'es comment:

zipf_distribution produces random integer-values in the range [0, k], distributed according to the discrete probability function:

P(x) = (v + x) ^ -q

* Preconditions: v > 0, q > 1
* The precondidtions are validated when NDEBUG is not defined via
* a pair of assert() directives.
* If NDEBUG is defined and either or both of these parameters take invalid
* values, the behavior of the class is undefined.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to hope that the users will read the comment or will it be better to throw when the values aren't in the expected range?

*/
ZipfRateLimiterImpl(RateLimiterPtr&& rate_limiter, bool deterministic, double q = 2.0,

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(optional) Not sure what we prefer in this codebase. I am generally wary of boolean "flag" arguments since they produce client code that is hard to read.

E.g:
ZipfRateLimiterImpl(rate_limiter, true, v, q) // what does true mean?

Of course the client can name it by creating a local variable. We could consider shaping the API in a way that unreadable usage won't be possible, say by defining a well named enum instead.

E.g.:

enum class ZipfBehavior { ZIPF_DETERMINISTIC, ZIPF_NON_DETERMINISTIC };

// Or maybe a bit more transparent:
enum class ZipfBehavior { ZIPF_PSEUDO_RANDOM, ZIPF_RANDOM };

WDYT?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that; +1

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should have a place to put code-level guidance like this?

double v = 1.0);

private:
absl::zipf_distribution<uint64_t> dist_;
absl::InsecureBitGen g_;
std::mt19937_64 mt_;
bool deterministic_;
};

} // namespace Nighthawk
119 changes: 119 additions & 0 deletions test/rate_limiter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -172,4 +172,123 @@ TEST_F(RateLimiterTest, DistributionSamplingRateLimiterImplSchedulingTest) {
EXPECT_TRUE(rate_limiter->tryAcquireOne());
}

// TODO(oschaaf): once we have hr sleep, test at a higher res.
class LinearRampingRateLimiterImplTest : public Test {
public:
std::vector<int64_t> getAcquisitionTimings(const Frequency frequency,
const std::chrono::seconds duration) {
Envoy::Event::SimulatedTimeSystem time_system;
std::vector<int64_t> aquisition_timings;
LinearRampingRateLimiterImpl rate_limiter(time_system, duration, frequency);
auto total_ms_elapsed = 0ms;
const auto clock_tick = 1ms;
auto last_acquisition_timestamp = 0ms;
Comment thread
mum4k marked this conversation as resolved.
Outdated
EXPECT_FALSE(rate_limiter.tryAcquireOne());
do {
if (rate_limiter.tryAcquireOne()) {
EXPECT_FALSE(rate_limiter.tryAcquireOne());
aquisition_timings.push_back(total_ms_elapsed.count());
last_acquisition_timestamp = total_ms_elapsed;
}
time_system.sleep(clock_tick);
total_ms_elapsed += clock_tick;
} while (total_ms_elapsed <= duration);
return aquisition_timings;
}
};

TEST_F(RateLimiterTest, LinearRampingRateLimiterImplInvalidArgumentTest) {
Envoy::Event::SimulatedTimeSystem time_system;
// bad frequency
EXPECT_THROW(LinearRampingRateLimiterImpl rate_limiter(time_system, 1s, 0_Hz);
, NighthawkException);
// bad ramp duration
EXPECT_THROW(LinearRampingRateLimiterImpl rate_limiter(time_system, 0s, 1_Hz);
, NighthawkException);
EXPECT_THROW(LinearRampingRateLimiterImpl rate_limiter(time_system, -1s, 1_Hz);
, NighthawkException);
}

TEST_F(LinearRampingRateLimiterImplTest, TimingVerificationTest) {
EXPECT_EQ(getAcquisitionTimings(5_Hz, 5s),
std::vector<int64_t>(
{1000, 1733, 2237, 2646, 3000, 3317, 3606, 3873, 4124, 4359, 4583, 4796, 5000}));
EXPECT_EQ(getAcquisitionTimings(4_Hz, 2s), std::vector<int64_t>({708, 1225, 1582, 1871}));
}

class GraduallyOpeningRateLimiterFilterTest : public Test {
public:
std::vector<int64_t> getAcquisitionTimings(const Frequency frequency,
const std::chrono::seconds duration) {
Envoy::Event::SimulatedTimeSystem time_system;
std::vector<int64_t> aquisition_timings;
auto* unsafe_discrete_numeric_distribution_sampler =
new MockDiscreteNumericDistributionSampler();
std::mt19937_64 mt(1243);
std::uniform_int_distribution<uint64_t> dist(1, 1000000);
EXPECT_CALL(*unsafe_discrete_numeric_distribution_sampler, getValue)
.Times(AtLeast(1))
.WillRepeatedly(Invoke([&dist, &mt]() { return dist(mt); }));
RateLimiterPtr rate_limiter = std::make_unique<GraduallyOpeningRateLimiterFilter>(
duration,
std::unique_ptr<DiscreteNumericDistributionSampler>(
unsafe_discrete_numeric_distribution_sampler),
std::make_unique<LinearRateLimiter>(time_system, frequency));
auto total_ms_elapsed = 0ms;
auto clock_tick = 1ms;
EXPECT_FALSE(rate_limiter->tryAcquireOne());

do {
if (rate_limiter->tryAcquireOne()) {
aquisition_timings.push_back(total_ms_elapsed.count());
EXPECT_FALSE(rate_limiter->tryAcquireOne());
}
time_system.sleep(clock_tick);
total_ms_elapsed += clock_tick;
} while (total_ms_elapsed <= duration);

EXPECT_FALSE(rate_limiter->tryAcquireOne());
time_system.sleep(1s);
// Verify that after the rampup the expected constant pacing is maintained.
// Calls should be forwarded to the regular linear rate limiter algorithm with its
// corrective behavior so we can expect to acquire a series with that.
for (uint64_t i = 0; i < frequency.value(); i++) {
EXPECT_TRUE(rate_limiter->tryAcquireOne());
}
// Verify we acquired everything.
EXPECT_FALSE(rate_limiter->tryAcquireOne());
return aquisition_timings;
}
};

TEST_F(GraduallyOpeningRateLimiterFilterTest, TimingVerificationTest) {
EXPECT_EQ(getAcquisitionTimings(50_Hz, 1s),
std::vector<int64_t>({120, 320, 380, 560, 580, 600, 620, 640, 660, 680, 700, 740,
760, 780, 840, 860, 880, 900, 920, 940, 960, 980, 1000}));
}

class ZipfRateLimiterImplTest : public Test {};

TEST_F(ZipfRateLimiterImplTest, TimingVerificationTest) {
Envoy::Event::SimulatedTimeSystem time_system;
auto rate_limiter = std::make_unique<ZipfRateLimiterImpl>(
std::make_unique<LinearRateLimiter>(time_system, 10_Hz), true);
const std::chrono::seconds duration = 15s;
std::vector<int64_t> aquisition_timings;
auto total_ms_elapsed = 0ms;
auto clock_tick = 1ms;

do {
if (rate_limiter->tryAcquireOne()) {
aquisition_timings.push_back(total_ms_elapsed.count());
}
time_system.sleep(clock_tick);
total_ms_elapsed += clock_tick;
} while (total_ms_elapsed <= duration);
EXPECT_EQ(aquisition_timings,
std::vector<int64_t>({500, 800, 1300, 2400, 2900, 3900, 4200, 4400, 4500,
5800, 6000, 6400, 7900, 8400, 8600, 9900, 10200, 10500,
10600, 12000, 12300, 12600, 13300, 13600, 13700, 13800, 13900}));
}

} // namespace Nighthawk