Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions include/envoy/common/token_bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,20 @@ class TokenBucket {
virtual ~TokenBucket() {}

/**
* @param tokens supplies the number of tokens to be consumed. Default is 1.
* @return true if bucket is not empty, otherwise it returns false.
* @param tokens supplies the number of tokens to be consumed.
* @param allow_partial supplies whether the token bucket will allow consumption of less tokens
* than asked for. If allow_partial is true, the bucket contains 3 tokens,
* and the caller asks for 5, the bucket will return 3 tokens and now be
* empty.
* @return the number of tokens actually consumed.
*/
virtual bool consume(uint64_t tokens = 1) PURE;
virtual uint64_t consume(uint64_t tokens, bool allow_partial) PURE;

/**
* @return returns the approximate time until a next token is available. Currently it
* returns the upper bound on the amount of time until a next token is available.
*/
virtual uint64_t nextTokenAvailableMs() PURE;
virtual std::chrono::milliseconds nextTokenAvailable() PURE;
};

typedef std::unique_ptr<TokenBucket> TokenBucketPtr;
Expand Down
16 changes: 10 additions & 6 deletions source/common/common/token_bucket_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ TokenBucketImpl::TokenBucketImpl(uint64_t max_tokens, TimeSource& time_source, d
: max_tokens_(max_tokens), fill_rate_(std::abs(fill_rate)), tokens_(max_tokens),
last_fill_(time_source.monotonicTime()), time_source_(time_source) {}

bool TokenBucketImpl::consume(uint64_t tokens) {
uint64_t TokenBucketImpl::consume(uint64_t tokens, bool allow_partial) {
if (tokens_ < max_tokens_) {
const auto time_now = time_source_.monotonicTime();
tokens_ = std::min((std::chrono::duration<double>(time_now - last_fill_).count() * fill_rate_) +
Expand All @@ -17,21 +17,25 @@ bool TokenBucketImpl::consume(uint64_t tokens) {
last_fill_ = time_now;
}

if (allow_partial) {
tokens = std::min(tokens, static_cast<uint64_t>(std::floor(tokens_)));
}

if (tokens_ < tokens) {
return false;
return 0;
}

tokens_ -= tokens;
return true;
return tokens;
}

uint64_t TokenBucketImpl::nextTokenAvailableMs() {
std::chrono::milliseconds TokenBucketImpl::nextTokenAvailable() {
// If there are tokens available, return immediately.
if (tokens_ >= 1) {
return 0;
return std::chrono::milliseconds(0);
}
// TODO(ramaraochavali): implement a more precise way that works for very low rate limits.
return (1 / fill_rate_) * 1000;
return std::chrono::milliseconds(static_cast<uint64_t>(std::ceil((1 / fill_rate_) * 1000)));
}

} // namespace Envoy
6 changes: 3 additions & 3 deletions source/common/common/token_bucket_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ class TokenBucketImpl : public TokenBucket {
*/
explicit TokenBucketImpl(uint64_t max_tokens, TimeSource& time_source, double fill_rate = 1);

bool consume(uint64_t tokens = 1) override;

uint64_t nextTokenAvailableMs() override;
// TokenBucket
uint64_t consume(uint64_t tokens, bool allow_partial) override;
std::chrono::milliseconds nextTokenAvailable() override;

private:
const double max_tokens_;
Expand Down
5 changes: 2 additions & 3 deletions source/common/config/grpc_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,14 +126,13 @@ class GrpcStream : public Grpc::TypedAsyncStreamCallbacks<ResponseProto>,
}

bool checkRateLimitAllowsDrain() {
if (!rate_limiting_enabled_ || limit_request_->consume()) {
if (!rate_limiting_enabled_ || limit_request_->consume(1, false)) {
return true;
}
ASSERT(drain_request_timer_ != nullptr);
control_plane_stats_.rate_limit_enforced_.inc();
// Enable the drain request timer.
drain_request_timer_->enableTimer(
std::chrono::milliseconds(limit_request_->nextTokenAvailableMs()));
drain_request_timer_->enableTimer(limit_request_->nextTokenAvailable());
return false;
}

Expand Down
54 changes: 33 additions & 21 deletions test/common/common/token_bucket_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,60 +17,72 @@ class TokenBucketImplTest : public testing::Test {
TEST_F(TokenBucketImplTest, Initialization) {
TokenBucketImpl token_bucket{1, time_system_, -1.0};

EXPECT_TRUE(token_bucket.consume());
EXPECT_FALSE(token_bucket.consume());
EXPECT_EQ(1, token_bucket.consume(1, false));
EXPECT_EQ(0, token_bucket.consume(1, false));
}

// Verifies TokenBucket's maximum capacity.
TEST_F(TokenBucketImplTest, MaxBucketSize) {
TokenBucketImpl token_bucket{3, time_system_, 1};

EXPECT_TRUE(token_bucket.consume(3));
EXPECT_EQ(3, token_bucket.consume(3, false));
time_system_.setMonotonicTime(std::chrono::seconds(10));
EXPECT_FALSE(token_bucket.consume(4));
EXPECT_TRUE(token_bucket.consume(3));
EXPECT_EQ(0, token_bucket.consume(4, false));
EXPECT_EQ(3, token_bucket.consume(3, false));
}

// Verifies that TokenBucket can consume tokens.
TEST_F(TokenBucketImplTest, Consume) {
TokenBucketImpl token_bucket{10, time_system_, 1};

EXPECT_FALSE(token_bucket.consume(20));
EXPECT_TRUE(token_bucket.consume(9));
EXPECT_EQ(0, token_bucket.consume(20, false));
EXPECT_EQ(9, token_bucket.consume(9, false));

EXPECT_TRUE(token_bucket.consume());
EXPECT_EQ(1, token_bucket.consume(1, false));

time_system_.setMonotonicTime(std::chrono::milliseconds(999));
EXPECT_FALSE(token_bucket.consume());
EXPECT_EQ(0, token_bucket.consume(1, false));

time_system_.setMonotonicTime(std::chrono::milliseconds(5999));
EXPECT_FALSE(token_bucket.consume(6));
EXPECT_EQ(0, token_bucket.consume(6, false));

time_system_.setMonotonicTime(std::chrono::milliseconds(6000));
EXPECT_TRUE(token_bucket.consume(6));
EXPECT_FALSE(token_bucket.consume());
EXPECT_EQ(6, token_bucket.consume(6, false));
EXPECT_EQ(0, token_bucket.consume(1, false));
}

// Verifies that TokenBucket can refill tokens.
TEST_F(TokenBucketImplTest, Refill) {
TokenBucketImpl token_bucket{1, time_system_, 0.5};
EXPECT_TRUE(token_bucket.consume());
EXPECT_EQ(1, token_bucket.consume(1, false));

time_system_.setMonotonicTime(std::chrono::milliseconds(500));
EXPECT_FALSE(token_bucket.consume());
EXPECT_EQ(0, token_bucket.consume(1, false));
time_system_.setMonotonicTime(std::chrono::milliseconds(1500));
EXPECT_FALSE(token_bucket.consume());
EXPECT_EQ(0, token_bucket.consume(1, false));
time_system_.setMonotonicTime(std::chrono::milliseconds(2000));
EXPECT_TRUE(token_bucket.consume());
EXPECT_EQ(1, token_bucket.consume(1, false));
}

TEST_F(TokenBucketImplTest, NextTokenAvailable) {
TokenBucketImpl token_bucket{10, time_system_, 5};
EXPECT_TRUE(token_bucket.consume(9));
EXPECT_EQ(0, token_bucket.nextTokenAvailableMs());
EXPECT_TRUE(token_bucket.consume());
EXPECT_FALSE(token_bucket.consume());
EXPECT_EQ(200, token_bucket.nextTokenAvailableMs());
EXPECT_EQ(9, token_bucket.consume(9, false));
EXPECT_EQ(std::chrono::milliseconds(0), token_bucket.nextTokenAvailable());
EXPECT_EQ(1, token_bucket.consume(1, false));
EXPECT_EQ(0, token_bucket.consume(1, false));
EXPECT_EQ(std::chrono::milliseconds(200), token_bucket.nextTokenAvailable());
}

// Test partial consumption of tokens.
TEST_F(TokenBucketImplTest, PartialConsumption) {
TokenBucketImpl token_bucket{16, time_system_, 16};
EXPECT_EQ(16, token_bucket.consume(18, true));
EXPECT_EQ(std::chrono::milliseconds(63), token_bucket.nextTokenAvailable());
time_system_.sleep(std::chrono::milliseconds(62));
EXPECT_EQ(0, token_bucket.consume(1, true));
time_system_.sleep(std::chrono::milliseconds(1));
EXPECT_EQ(1, token_bucket.consume(2, true));
EXPECT_EQ(std::chrono::milliseconds(63), token_bucket.nextTokenAvailable());
}

} // namespace Envoy
3 changes: 0 additions & 3 deletions test/mocks/common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,4 @@ ReadyWatcher::~ReadyWatcher() {}
MockTimeSystem::MockTimeSystem() {}
MockTimeSystem::~MockTimeSystem() {}

MockTokenBucket::MockTokenBucket() {}
MockTokenBucket::~MockTokenBucket() {}

} // namespace Envoy
8 changes: 0 additions & 8 deletions test/mocks/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,6 @@ class MockTimeSystem : public Event::TestTimeSystem {
Event::TestRealTimeSystem real_time_; // NO_CHECK_FORMAT(real_time)
};

class MockTokenBucket : public TokenBucket {
public:
MockTokenBucket();
~MockTokenBucket();

MOCK_METHOD1(consume, bool(uint64_t));
};

// Captures absl::string_view parameters into temp strings, for use
// with gmock's SaveArg<n>. Providing an absl::string_view compiles,
// but fails because by the time you examine the saved value, its
Expand Down