Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions include/nighthawk/common/rate_limiter.h
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
#pragma once

#include <chrono>
#include <memory>

#include "envoy/common/pure.h"
#include "envoy/common/time.h"

#include "absl/types/optional.h"

namespace Nighthawk {

Expand All @@ -24,6 +28,12 @@ class RateLimiter {
* Releases a controlled resource.
*/
virtual void releaseOne() PURE;

/**
* @return Envoy::TimeSource& time_source
*/
virtual Envoy::TimeSource& timeSource() PURE;
virtual std::chrono::nanoseconds elapsed() PURE;
Comment thread
htuch marked this conversation as resolved.
};

using RateLimiterPtr = std::unique_ptr<RateLimiter>;
Expand Down
47 changes: 25 additions & 22 deletions source/common/rate_limiter_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@

namespace Nighthawk {

using namespace std::chrono_literals;

BurstingRateLimiter::BurstingRateLimiter(RateLimiterPtr&& rate_limiter, const uint64_t burst_size)
: rate_limiter_(std::move(rate_limiter)), burst_size_(burst_size) {
: ForwardingRateLimiterImpl(std::move(rate_limiter)), burst_size_(burst_size) {
ASSERT(burst_size_ > 0);
}

Expand Down Expand Up @@ -51,28 +53,23 @@ void BurstingRateLimiter::releaseOne() {
}

LinearRateLimiter::LinearRateLimiter(Envoy::TimeSource& time_source, const Frequency frequency)
: time_source_(time_source), acquireable_count_(0), acquired_count_(0), frequency_(frequency) {
: RateLimiterBaseImpl(time_source), acquireable_count_(0), acquired_count_(0),
frequency_(frequency) {
if (frequency.value() <= 0) {
throw NighthawkException("Frequency must be > 0");
}
}

bool LinearRateLimiter::tryAcquireOne() {
// TODO(oschaaf): consider adding an explicit start() call to the interface.
if (!started_) {
started_at_ = time_source_.monotonicTime();
started_ = true;
}
if (acquireable_count_ > 0) {
acquireable_count_--;
acquired_count_++;
return true;
}

const auto elapsed_since_start = time_source_.monotonicTime() - started_at_;
acquireable_count_ =
static_cast<int64_t>(std::floor(elapsed_since_start / frequency_.interval())) -
acquired_count_;
static_cast<int64_t>(std::floor(elapsed() / frequency_.interval())) - acquired_count_;
return acquireable_count_ > 0 ? tryAcquireOne() : false;
}

Expand All @@ -81,38 +78,44 @@ void LinearRateLimiter::releaseOne() {
acquired_count_--;
}

DelegatingRateLimiter::DelegatingRateLimiter(Envoy::TimeSource& time_source,
RateLimiterPtr&& rate_limiter,
RateLimiterDelegate random_distribution_generator)
: random_distribution_generator_(std::move(random_distribution_generator)),
time_source_(time_source), rate_limiter_(std::move(rate_limiter)) {}
DelegatingRateLimiterImpl::DelegatingRateLimiterImpl(
RateLimiterPtr&& rate_limiter, RateLimiterDelegate random_distribution_generator)
: ForwardingRateLimiterImpl(std::move(rate_limiter)),
random_distribution_generator_(std::move(random_distribution_generator)) {}

bool DelegatingRateLimiter::tryAcquireOne() {
bool DelegatingRateLimiterImpl::tryAcquireOne() {
if (distributed_start_ == absl::nullopt) {
if (rate_limiter_->tryAcquireOne()) {
distributed_start_ = time_source_.monotonicTime() + random_distribution_generator_();
distributed_start_ = timeSource().monotonicTime() + random_distribution_generator_();
}
}

if (distributed_start_ != absl::nullopt && distributed_start_ <= time_source_.monotonicTime()) {
if (distributed_start_ != absl::nullopt && distributed_start_ <= timeSource().monotonicTime()) {
distributed_start_ = absl::nullopt;
return true;
}

return false;
}

void DelegatingRateLimiter::releaseOne() {
void DelegatingRateLimiterImpl::releaseOne() {
distributed_start_ = absl::nullopt;
rate_limiter_->releaseOne();
}

DistributionSamplingRateLimiterImpl::DistributionSamplingRateLimiterImpl(
Envoy::TimeSource& time_source, DiscreteNumericDistributionSamplerPtr&& provider,
RateLimiterPtr&& rate_limiter)
: DelegatingRateLimiter(
time_source, std::move(rate_limiter),
DiscreteNumericDistributionSamplerPtr&& provider, RateLimiterPtr&& rate_limiter)
: DelegatingRateLimiterImpl(
std::move(rate_limiter),
[this]() { return std::chrono::duration<uint64_t, std::nano>(provider_->getValue()); }),
provider_(std::move(provider)) {}

FilteringRateLimiterImpl::FilteringRateLimiterImpl(RateLimiterPtr&& rate_limiter,
RateLimiterFilter filter)
: ForwardingRateLimiterImpl(std::move(rate_limiter)), filter_(std::move(filter)) {}

bool FilteringRateLimiterImpl::tryAcquireOne() {
return rate_limiter_->tryAcquireOne() ? filter_() : false;
}

} // namespace Nighthawk
129 changes: 92 additions & 37 deletions source/common/rate_limiter_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,62 @@

namespace Nighthawk {

/**
* Rate limiter base class, which implements some shared functionality for derivations that
* compute acquireable counts based on elapsed time. Rate limiters that apply filters,offsets
* or otherwise wrap another rate limiter should derive from ForwardingRateLimiterImpl instead.
*/
class RateLimiterBaseImpl : public RateLimiter {
public:
RateLimiterBaseImpl(Envoy::TimeSource& time_source) : time_source_(time_source){};
Envoy::TimeSource& timeSource() override { return time_source_; }
std::chrono::nanoseconds elapsed() override {
// TODO(oschaaf): consider adding an explicit start() call to the interface.
const auto now = time_source_.monotonicTime();
if (start_time_ == absl::nullopt) {
start_time_ = now;
}
return now - start_time_.value();
}

private:
Envoy::TimeSource& time_source_;
absl::optional<Envoy::MonotonicTime> start_time_;
};

/**
* Simple rate limiter that will allow acquiring at a linear pace.
* The average rate is computed over a timeframe that starts at
* the first call to tryAcquireOne().
*/
class LinearRateLimiter : public RateLimiterBaseImpl,
public Envoy::Logger::Loggable<Envoy::Logger::Id::main> {
public:
LinearRateLimiter(Envoy::TimeSource& time_source, const Frequency frequency);
bool tryAcquireOne() override;
void releaseOne() override;

protected:
int64_t acquireable_count_{0};
uint64_t acquired_count_{0};
const Frequency frequency_;
};

/**
* Base for a rate limiter which wraps another rate limiter, and forwards
* some calls.
*/
class ForwardingRateLimiterImpl : public RateLimiter {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If RateLimiterBaseImpl is the base RateLimiter class that we're working with, should this rate limiter also extend from it? Would it make sense to add a comment explaining why it doesn't? If it's because RateLimiterBaseImpl is too specific to be pulled from here, I might suggest that BaseImpl is a counterintuitive name for it.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see your point, but I have a hard time coming up with a better name; hopefully the comment
I added in 6d1e747 help

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that your added comment clarifies things nicely. Thank you

public:
ForwardingRateLimiterImpl(RateLimiterPtr&& rate_limiter)
: rate_limiter_(std::move(rate_limiter)) {}
Envoy::TimeSource& timeSource() override { return rate_limiter_->timeSource(); }
std::chrono::nanoseconds elapsed() override { return rate_limiter_->elapsed(); }

protected:
const RateLimiterPtr rate_limiter_;
};

/**
* BurstingRatelimiter can be wrapped around another rate limiter. It has two modes:
* 1. First it will be accumulating acquisitions by forwarding calls to the wrapped
Expand All @@ -22,68 +78,50 @@ namespace Nighthawk {
* acquisition calls (returning true and substracting from the accumulated total until
* nothing is left, after which mode 1 will be entered again).
*/
class BurstingRateLimiter : public RateLimiter,
class BurstingRateLimiter : public ForwardingRateLimiterImpl,
public Envoy::Logger::Loggable<Envoy::Logger::Id::main> {
public:
BurstingRateLimiter(RateLimiterPtr&& rate_limiter, const uint64_t burst_size);
bool tryAcquireOne() override;
void releaseOne() override;

private:
const RateLimiterPtr rate_limiter_;
const uint64_t burst_size_;
uint64_t accumulated_{0};
bool releasing_{};
absl::optional<bool> previously_releasing_; // Solely used for sanity checking.
};

// Simple rate limiter that will allow acquiring at a linear pace.
// The average rate is computed over a timeframe that starts at
// the first call to tryAcquireOne().
class LinearRateLimiter : public RateLimiter,
public Envoy::Logger::Loggable<Envoy::Logger::Id::main> {
public:
LinearRateLimiter(Envoy::TimeSource& time_source, const Frequency frequency);
bool tryAcquireOne() override;
void releaseOne() override;

private:
Envoy::TimeSource& time_source_;
int64_t acquireable_count_;
uint64_t acquired_count_;
const Frequency frequency_;
bool started_{};
Envoy::MonotonicTime started_at_;
};

// We use an unsigned duration here to ensure only future points in time will be yielded.
// The consuming rate limiter will hold off opening up until the initial point in time plus the
// offset obtained via the delegate have transpired.
/**
* The consuming rate limiter will hold off opening up until the initial point in time plus the
* offset obtained via the delegate have transpired.
* We use an unsigned duration here to ensure only future points in time will be yielded.
*/
using RateLimiterDelegate = std::function<const std::chrono::duration<uint64_t, std::nano>()>;

// Wraps a rate limiter, and allows plugging in a delegate which will be queried to offset the
// timing of the underlying rate limiter.
class DelegatingRateLimiter : public RateLimiter,
public Envoy::Logger::Loggable<Envoy::Logger::Id::main> {
/**
* Wraps a rate limiter, and allows plugging in a delegate which will be queried to offset the
* timing of the underlying rate limiter.
*/
class DelegatingRateLimiterImpl : public ForwardingRateLimiterImpl,
public Envoy::Logger::Loggable<Envoy::Logger::Id::main> {
public:
DelegatingRateLimiter(Envoy::TimeSource& time_source, RateLimiterPtr&& rate_limiter,
RateLimiterDelegate random_distribution_generator);
DelegatingRateLimiterImpl(RateLimiterPtr&& rate_limiter,
RateLimiterDelegate random_distribution_generator);
bool tryAcquireOne() override;
void releaseOne() override;

protected:
const RateLimiterDelegate random_distribution_generator_;

private:
Envoy::TimeSource& time_source_;
const RateLimiterPtr rate_limiter_;
absl::optional<Envoy::MonotonicTime> distributed_start_;
};

class UniformRandomDistributionSamplerImpl : public DiscreteNumericDistributionSampler {
public:
UniformRandomDistributionSamplerImpl(const std::chrono::duration<uint64_t, std::nano> upper_bound)
: distribution_(0, upper_bound.count()) {}
UniformRandomDistributionSamplerImpl(const uint64_t upper_bound)
: distribution_(0, upper_bound) {}
uint64_t getValue() override { return distribution_(generator_); }

private:
Expand All @@ -92,14 +130,31 @@ class UniformRandomDistributionSamplerImpl : public DiscreteNumericDistributionS
};

// Allows adding uniformly distributed random timing offsets to an underlying rate limiter.
class DistributionSamplingRateLimiterImpl : public DelegatingRateLimiter {
class DistributionSamplingRateLimiterImpl : public DelegatingRateLimiterImpl {
public:
DistributionSamplingRateLimiterImpl(Envoy::TimeSource& time_source,
DiscreteNumericDistributionSamplerPtr&& provider,
DistributionSamplingRateLimiterImpl(DiscreteNumericDistributionSamplerPtr&& provider,
RateLimiterPtr&& rate_limiter);

private:
DiscreteNumericDistributionSamplerPtr provider_;
};

/**
* Callback used to indicate if a rate limiter release should be supressed or not.
*/
using RateLimiterFilter = std::function<bool()>;

// Wraps a rate limiter, and allows plugging in a delegate which will be queried to apply a
// filter to acquisitions.
class FilteringRateLimiterImpl : public ForwardingRateLimiterImpl,
public Envoy::Logger::Loggable<Envoy::Logger::Id::main> {
public:
FilteringRateLimiterImpl(RateLimiterPtr&& rate_limiter, RateLimiterFilter filter);
bool tryAcquireOne() override;
void releaseOne() override { rate_limiter_->releaseOne(); }

protected:
const RateLimiterFilter filter_;
};

} // namespace Nighthawk
4 changes: 4 additions & 0 deletions test/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ class MockRateLimiter : public RateLimiter {

MOCK_METHOD0(tryAcquireOne, bool());
MOCK_METHOD0(releaseOne, void());
MOCK_METHOD0(timeSource, Envoy::TimeSource&());
MOCK_METHOD0(elapsed, std::chrono::nanoseconds());
};

class MockSequencer : public Sequencer {
Expand Down Expand Up @@ -186,6 +188,8 @@ class MockDiscreteNumericDistributionSampler : public DiscreteNumericDistributio
public:
MockDiscreteNumericDistributionSampler();
MOCK_METHOD0(getValue, uint64_t());
MOCK_CONST_METHOD0(min, uint64_t());
MOCK_CONST_METHOD0(max, uint64_t());
};

} // namespace Nighthawk
33 changes: 18 additions & 15 deletions test/rate_limiter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,17 +77,16 @@ class BurstingRateLimiterIntegrationTest : public Test {
const auto burst_interval_ms =
std::chrono::duration_cast<std::chrono::milliseconds>(frequency.interval() * burst_size);

EXPECT_FALSE(rate_limiter->tryAcquireOne());
time_system.sleep(burst_interval_ms);
for (uint64_t i = 0; i < burst_size; i++) {
EXPECT_TRUE(rate_limiter->tryAcquireOne());
}
EXPECT_FALSE(rate_limiter->tryAcquireOne());
time_system.sleep(burst_interval_ms / 2);
EXPECT_FALSE(rate_limiter->tryAcquireOne());
time_system.sleep(burst_interval_ms);
for (uint64_t i = 0; i < burst_size; i++) {
EXPECT_TRUE(rate_limiter->tryAcquireOne());
for (uint64_t i = 0; i < 10000; i++) {
uint64_t burst_acquired = 0;
while (rate_limiter->tryAcquireOne()) {
burst_acquired++;
}
if (burst_acquired) {
EXPECT_EQ(burst_acquired, burst_size);
EXPECT_EQ(i % burst_interval_ms.count(), 0);
}
time_system.sleep(1ms);
}
}
};
Expand All @@ -109,9 +108,11 @@ TEST_F(RateLimiterTest, DistributionSamplingRateLimiterImplTest) {
auto mock_rate_limiter = std::make_unique<MockRateLimiter>();
MockRateLimiter& unsafe_mock_rate_limiter = *mock_rate_limiter;
Envoy::Event::SimulatedTimeSystem time_system;
EXPECT_CALL(unsafe_mock_rate_limiter, timeSource)
.Times(AtLeast(1))
.WillRepeatedly(ReturnRef(time_system));
RateLimiterPtr rate_limiter = std::make_unique<DistributionSamplingRateLimiterImpl>(
time_system, std::make_unique<UniformRandomDistributionSamplerImpl>(1ns),
std::move(mock_rate_limiter));
std::make_unique<UniformRandomDistributionSamplerImpl>(1), std::move(mock_rate_limiter));

EXPECT_CALL(unsafe_mock_rate_limiter, tryAcquireOne).Times(tries).WillRepeatedly(Return(true));
EXPECT_CALL(unsafe_mock_rate_limiter, releaseOne).Times(tries);
Expand All @@ -124,7 +125,7 @@ TEST_F(RateLimiterTest, DistributionSamplingRateLimiterImplTest) {
acquisitions++;
}
// We test the release gets propagated to the mock rate limiter.
// also, the release will force DelegatingRateLimiter to propagate tryAcquireOne.
// also, the release will force DelegatingRateLimiterImpl to propagate tryAcquireOne.
rate_limiter->releaseOne();
}
// 1 in a billion chance of failure.
Expand All @@ -139,10 +140,12 @@ TEST_F(RateLimiterTest, DistributionSamplingRateLimiterImplSchedulingTest) {
Envoy::Event::SimulatedTimeSystem time_system;
auto* unsafe_discrete_numeric_distribution_sampler = new MockDiscreteNumericDistributionSampler();
RateLimiterPtr rate_limiter = std::make_unique<DistributionSamplingRateLimiterImpl>(
time_system,
std::unique_ptr<DiscreteNumericDistributionSampler>(
unsafe_discrete_numeric_distribution_sampler),
std::move(mock_rate_limiter));
EXPECT_CALL(unsafe_mock_rate_limiter, timeSource)
.Times(AtLeast(1))
.WillRepeatedly(ReturnRef(time_system));

EXPECT_CALL(unsafe_mock_rate_limiter, tryAcquireOne)
.Times(AtLeast(1))
Expand Down