Skip to content
Merged
16 changes: 15 additions & 1 deletion include/envoy/common/token_bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,18 @@ class TokenBucket {
*/
virtual uint64_t consume(uint64_t tokens, bool allow_partial) PURE;

/**
* @param tokens supplies the number of tokens to be consumed.
* @param allow_partial supplies whether the token bucket will allow consumption of less tokens
* than asked for. If allow_partial is true, the bucket contains 3 tokens,
* and the caller asks for 5, the bucket will return 3 tokens and now be
* empty.
* @param time_to_next_token out param indicating the approx time until next token is available.
* @return the number of tokens actually consumed.
*/
virtual uint64_t consume(uint64_t tokens, bool allow_partial,
std::chrono::milliseconds& time_to_next_token) PURE;

/**
* @return returns the approximate time until a next token is available. Currently it
* returns the upper bound on the amount of time until a next token is available.
Expand All @@ -36,8 +48,10 @@ class TokenBucket {
/**
* Reset the bucket with a specific number of tokens. Refill will begin again from the time that
* this routine is called.
* Note: The reset call might be honored only the first time this method is called. Check the
* concrete implementation to confirm.
*/
virtual void reset(uint64_t num_tokens) PURE;
virtual void maybeReset(uint64_t num_tokens) PURE;
};

using TokenBucketPtr = std::unique_ptr<TokenBucket>;
Expand Down
13 changes: 13 additions & 0 deletions source/common/common/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,19 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "shared_token_bucket_impl_lib",
srcs = ["shared_token_bucket_impl.cc"],
hdrs = ["shared_token_bucket_impl.h"],
deps = [
"//include/envoy/common:time_interface",
"//source/common/common:thread_lib",
"//source/common/common:thread_synchronizer_lib",
"//source/common/common:token_bucket_impl_lib",
"//source/common/common:utility_lib",
],
)

envoy_cc_library(
name = "statusor_lib",
hdrs = ["statusor.h"],
Expand Down
46 changes: 46 additions & 0 deletions source/common/common/shared_token_bucket_impl.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#include "common/common/shared_token_bucket_impl.h"

#include <chrono>

#include "common/common/lock_guard.h"

namespace Envoy {

const char SharedTokenBucketImpl::GetImplSyncPoint[] = "pre_get_impl";
const char SharedTokenBucketImpl::ResetCheckSyncPoint[] = "post_reset_check";

SharedTokenBucketImpl::SharedTokenBucketImpl(uint64_t max_tokens, TimeSource& time_source,
double fill_rate)
: impl_(max_tokens, time_source, fill_rate), reset_once_(false) {}

uint64_t SharedTokenBucketImpl::consume(uint64_t tokens, bool allow_partial) {
Thread::LockGuard lock(mutex_);
synchronizer_.syncPoint(GetImplSyncPoint);
return impl_.consume(tokens, allow_partial);
};

uint64_t SharedTokenBucketImpl::consume(uint64_t tokens, bool allow_partial,
std::chrono::milliseconds& time_to_next_token) {
Thread::LockGuard lock(mutex_);
synchronizer_.syncPoint(GetImplSyncPoint);
return impl_.consume(tokens, allow_partial, time_to_next_token);
};

std::chrono::milliseconds SharedTokenBucketImpl::nextTokenAvailable() {
Thread::LockGuard lock(mutex_);
synchronizer_.syncPoint(GetImplSyncPoint);
return impl_.nextTokenAvailable();
};

void SharedTokenBucketImpl::maybeReset(uint64_t num_tokens) {
Thread::LockGuard lock(mutex_);
// Don't reset if reset once before.
if (reset_once_) {
return;
}
reset_once_ = true;
synchronizer_.syncPoint(ResetCheckSyncPoint);
impl_.maybeReset(num_tokens);
};

} // namespace Envoy
51 changes: 51 additions & 0 deletions source/common/common/shared_token_bucket_impl.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#pragma once

#include "common/common/thread.h"
#include "common/common/thread_synchronizer.h"
#include "common/common/token_bucket_impl.h"

namespace Envoy {

/**
* A thread-safe wrapper class for TokenBucket interface.
*/
class SharedTokenBucketImpl : public TokenBucket {
public:
static const char GetImplSyncPoint[];
static const char ResetCheckSyncPoint[];
/**
* @param max_tokens supplies the maximum number of tokens in the bucket.
* @param time_source supplies the time source.
* @param fill_rate supplies the number of tokens that will return to the bucket on each second.
* The default is 1.
* @param mutex supplies the mutex object to be used to ensure thread-safety when the token bucket
* is shared. By default the class will be thread-unsafe.
*/
explicit SharedTokenBucketImpl(uint64_t max_tokens, TimeSource& time_source,
double fill_rate = 1);

SharedTokenBucketImpl(const SharedTokenBucketImpl&) = delete;
SharedTokenBucketImpl(SharedTokenBucketImpl&&) = delete;

// TokenBucket

uint64_t consume(uint64_t tokens, bool allow_partial) override;
uint64_t consume(uint64_t tokens, bool allow_partial,
std::chrono::milliseconds& time_to_next_token) override;
std::chrono::milliseconds nextTokenAvailable() override;

/**
* Since the token bucket is shared, only the first reset call will work.
* Subsequent calls to reset method will be ignored.
*/
void maybeReset(uint64_t num_tokens) override;

private:
Thread::MutexBasicLockable mutex_;
TokenBucketImpl impl_ ABSL_GUARDED_BY(mutex_);
bool reset_once_ ABSL_GUARDED_BY(mutex_);
mutable Thread::ThreadSynchronizer synchronizer_; // Used only for testing.
friend class SharedTokenBucketImplTest;
};

} // namespace Envoy
9 changes: 8 additions & 1 deletion source/common/common/token_bucket_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ uint64_t TokenBucketImpl::consume(uint64_t tokens, bool allow_partial) {
return tokens;
}

uint64_t TokenBucketImpl::consume(uint64_t tokens, bool allow_partial,
std::chrono::milliseconds& time_to_next_token) {
auto tokens_consumed = consume(tokens, allow_partial);
time_to_next_token = nextTokenAvailable();
return tokens_consumed;
}

std::chrono::milliseconds TokenBucketImpl::nextTokenAvailable() {
// If there are tokens available, return immediately.
if (tokens_ >= 1) {
Expand All @@ -38,7 +45,7 @@ std::chrono::milliseconds TokenBucketImpl::nextTokenAvailable() {
return std::chrono::milliseconds(static_cast<uint64_t>(std::ceil((1 / fill_rate_) * 1000)));
}

void TokenBucketImpl::reset(uint64_t num_tokens) {
void TokenBucketImpl::maybeReset(uint64_t num_tokens) {
ASSERT(num_tokens <= max_tokens_);
tokens_ = num_tokens;
last_fill_ = time_source_.monotonicTime();
Expand Down
4 changes: 3 additions & 1 deletion source/common/common/token_bucket_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ class TokenBucketImpl : public TokenBucket {

// TokenBucket
uint64_t consume(uint64_t tokens, bool allow_partial) override;
uint64_t consume(uint64_t tokens, bool allow_partial,
std::chrono::milliseconds& time_to_next_token) override;
std::chrono::milliseconds nextTokenAvailable() override;
void reset(uint64_t num_tokens) override;
void maybeReset(uint64_t num_tokens) override;

private:
const double max_tokens_;
Expand Down
2 changes: 1 addition & 1 deletion source/extensions/filters/http/fault/fault_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ void StreamRateLimiter::onTokenTimer() {
// full 1s of data right away which might not introduce enough delay for a stream that doesn't
// have enough data to span more than 1s of rate allowance). Once we reset, we will subsequently
// allow for bursting within the second to account for our data provider being bursty.
token_bucket_.reset(1);
token_bucket_.maybeReset(1);
saw_data_ = true;
}

Expand Down
10 changes: 10 additions & 0 deletions test/common/common/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,16 @@ envoy_cc_test(
],
)

envoy_cc_test(
name = "shared_token_bucket_impl_test",
srcs = ["shared_token_bucket_impl_test.cc"],
deps = [
"//source/common/common:shared_token_bucket_impl_lib",
"//test/test_common:simulated_time_system_lib",
"//test/test_common:utility_lib",
],
)

envoy_cc_test(
name = "callback_impl_test",
srcs = ["callback_impl_test.cc"],
Expand Down
Loading