Skip to content
Merged
13 changes: 13 additions & 0 deletions source/common/common/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,19 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "shared_token_bucket_impl_lib",
srcs = ["shared_token_bucket_impl.cc"],
hdrs = ["shared_token_bucket_impl.h"],
deps = [
"//include/envoy/common:time_interface",
"//source/common/common:token_bucket_impl_lib",
"//source/common/common:thread_lib",
"//source/common/common:thread_synchronizer_lib",
"//source/common/common:utility_lib",
],
)

envoy_cc_library(
name = "statusor_lib",
hdrs = ["statusor.h"],
Expand Down
49 changes: 49 additions & 0 deletions source/common/common/shared_token_bucket_impl.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#include "common/common/shared_token_bucket_impl.h"

#include <chrono>

#include "common/common/lock_guard.h"

namespace Envoy {

const char SharedTokenBucketImpl::GetImplSyncPoint[] = "pre_get_impl";
const char SharedTokenBucketImpl::ResetCheckSyncPoint[] = "post_reset_check";

SharedTokenBucketImpl::SharedTokenBucketImpl(uint64_t max_tokens, TimeSource& time_source,
double fill_rate)
: impl_(max_tokens, time_source, fill_rate), reset_once_(false) {}

uint64_t SharedTokenBucketImpl::consume(uint64_t tokens, bool allow_partial)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
Thread::LockGuard lock(mutex_);
synchronizer_.syncPoint(GetImplSyncPoint);
return getImpl().consume(tokens, allow_partial);
};

std::chrono::milliseconds SharedTokenBucketImpl::nextTokenAvailable()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
Thread::LockGuard lock(mutex_);
synchronizer_.syncPoint(GetImplSyncPoint);
return getImpl().nextTokenAvailable();
};

void SharedTokenBucketImpl::reset(uint64_t num_tokens) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
Thread::LockGuard lock(mutex_);
// Don't reset if reset once before.
if (reset_once_) {
Comment thread
nitgoy marked this conversation as resolved.
return;
}
reset_once_ = true;
synchronizer_.syncPoint(ResetCheckSyncPoint);
getImpl().reset(num_tokens);
};

bool SharedTokenBucketImpl::isMutexLocked() {
auto locked = mutex_.tryLock();
if (locked) {
mutex_.unlock();
}
return !locked;
}

} // namespace Envoy
59 changes: 59 additions & 0 deletions source/common/common/shared_token_bucket_impl.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#pragma once

#include "common/common/token_bucket_impl.h"

#include "common/common/thread.h"
#include "common/common/thread_synchronizer.h"

namespace Envoy {

/**
* A thread-safe wrapper class for TokenBucket interface.
*/
class SharedTokenBucketImpl : public TokenBucket {
public:
static const char GetImplSyncPoint[];
static const char ResetCheckSyncPoint[];
/**
* @param max_tokens supplies the maximum number of tokens in the bucket.
* @param time_source supplies the time source.
* @param fill_rate supplies the number of tokens that will return to the bucket on each second.
* The default is 1.
* @param mutex supplies the mutex object to be used to ensure thread-safety when the token bucket
* is shared. By default the class will be thread-unsafe.
*/
explicit SharedTokenBucketImpl(uint64_t max_tokens, TimeSource& time_source,
double fill_rate = 1);

SharedTokenBucketImpl(const SharedTokenBucketImpl&) = delete;
SharedTokenBucketImpl(SharedTokenBucketImpl&&) = delete;

// TokenBucket
uint64_t consume(uint64_t tokens, bool allow_partial)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_) override;

std::chrono::milliseconds nextTokenAvailable() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_) override;

/**
Comment thread
nitgoy marked this conversation as resolved.
* Resets the bucket to contain tokens equal to @param num_tokens
* Since the token bucket is shared, only the first reset call will work. Subsequent calls to
* reset method will be ignored.
*/
void reset(uint64_t num_tokens) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_) override;

// Used only for testing.
Thread::ThreadSynchronizer& synchronizer() { return synchronizer_; };

// Returns a flag to indicate whether mutex was in lock state.
bool isMutexLocked() ABSL_EXCLUSIVE_TRYLOCK_FUNCTION(false, mutex_);
Comment thread
nitgoy marked this conversation as resolved.
Outdated

private:
TokenBucketImpl& getImpl() { return impl_; }
Comment thread
nitgoy marked this conversation as resolved.
Outdated

Thread::MutexBasicLockable mutex_;
TokenBucketImpl impl_ ABSL_GUARDED_BY(mutex_);
bool reset_once_ ABSL_GUARDED_BY(mutex_);
mutable Thread::ThreadSynchronizer synchronizer_; // Used only for testing.
};

} // namespace Envoy
10 changes: 10 additions & 0 deletions test/common/common/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,16 @@ envoy_cc_test(
],
)

envoy_cc_test(
name = "shared_token_bucket_impl_test",
srcs = ["shared_token_bucket_impl_test.cc"],
deps = [
"//source/common/common:shared_token_bucket_impl_lib",
"//test/test_common:simulated_time_system_lib",
"//test/test_common:utility_lib",
],
)

envoy_cc_test(
name = "callback_impl_test",
srcs = ["callback_impl_test.cc"],
Expand Down
152 changes: 152 additions & 0 deletions test/common/common/shared_token_bucket_impl_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
#include <chrono>

#include "common/common/shared_token_bucket_impl.h"

#include "test/test_common/simulated_time_system.h"

#include "gtest/gtest.h"

namespace Envoy {

class SharedSharedTokenBucketImplTest : public testing::Test {
Comment thread
nitgoy marked this conversation as resolved.
Outdated
protected:
Event::SimulatedTimeSystem time_system_;
};

// Verifies TokenBucket initialization.
TEST_F(SharedSharedTokenBucketImplTest, Initialization) {
SharedTokenBucketImpl token_bucket{1, time_system_, -1.0};

EXPECT_EQ(1, token_bucket.consume(1, false));
EXPECT_EQ(0, token_bucket.consume(1, false));
}

// Verifies TokenBucket's maximum capacity.
TEST_F(SharedSharedTokenBucketImplTest, MaxBucketSize) {
SharedTokenBucketImpl token_bucket{3, time_system_, 1};

EXPECT_EQ(3, token_bucket.consume(3, false));
time_system_.setMonotonicTime(std::chrono::seconds(10));
EXPECT_EQ(0, token_bucket.consume(4, false));
EXPECT_EQ(3, token_bucket.consume(3, false));
}

// Verifies that TokenBucket can consume tokens.
TEST_F(SharedSharedTokenBucketImplTest, Consume) {
SharedTokenBucketImpl token_bucket{10, time_system_, 1};

EXPECT_EQ(0, token_bucket.consume(20, false));
EXPECT_EQ(9, token_bucket.consume(9, false));

EXPECT_EQ(1, token_bucket.consume(1, false));

time_system_.setMonotonicTime(std::chrono::milliseconds(999));
EXPECT_EQ(0, token_bucket.consume(1, false));

time_system_.setMonotonicTime(std::chrono::milliseconds(5999));
EXPECT_EQ(0, token_bucket.consume(6, false));

time_system_.setMonotonicTime(std::chrono::milliseconds(6000));
EXPECT_EQ(6, token_bucket.consume(6, false));
EXPECT_EQ(0, token_bucket.consume(1, false));
}

// Verifies that TokenBucket can refill tokens.
TEST_F(SharedSharedTokenBucketImplTest, Refill) {
SharedTokenBucketImpl token_bucket{1, time_system_, 0.5};
EXPECT_EQ(1, token_bucket.consume(1, false));

time_system_.setMonotonicTime(std::chrono::milliseconds(500));
EXPECT_EQ(0, token_bucket.consume(1, false));
time_system_.setMonotonicTime(std::chrono::milliseconds(1500));
EXPECT_EQ(0, token_bucket.consume(1, false));
time_system_.setMonotonicTime(std::chrono::milliseconds(2000));
EXPECT_EQ(1, token_bucket.consume(1, false));
}

TEST_F(SharedSharedTokenBucketImplTest, NextTokenAvailable) {
SharedTokenBucketImpl token_bucket{10, time_system_, 5};
EXPECT_EQ(9, token_bucket.consume(9, false));
EXPECT_EQ(std::chrono::milliseconds(0), token_bucket.nextTokenAvailable());
EXPECT_EQ(1, token_bucket.consume(1, false));
EXPECT_EQ(0, token_bucket.consume(1, false));
EXPECT_EQ(std::chrono::milliseconds(200), token_bucket.nextTokenAvailable());
}

// Test partial consumption of tokens.
TEST_F(SharedSharedTokenBucketImplTest, PartialConsumption) {
SharedTokenBucketImpl token_bucket{16, time_system_, 16};
EXPECT_EQ(16, token_bucket.consume(18, true));
EXPECT_EQ(std::chrono::milliseconds(63), token_bucket.nextTokenAvailable());
time_system_.advanceTimeWait(std::chrono::milliseconds(62));
EXPECT_EQ(0, token_bucket.consume(1, true));
time_system_.advanceTimeWait(std::chrono::milliseconds(1));
EXPECT_EQ(1, token_bucket.consume(2, true));
EXPECT_EQ(std::chrono::milliseconds(63), token_bucket.nextTokenAvailable());
}

// Test reset functionality for a shared token bucket.
TEST_F(SharedSharedTokenBucketImplTest, Reset) {
SharedTokenBucketImpl token_bucket{16, time_system_, 16};
token_bucket.synchronizer().enable();
// Start a thread and call consume. This will wait post checking reset_once flag.
token_bucket.synchronizer().waitOn(SharedTokenBucketImpl::ResetCheckSyncPoint);
std::thread thread([&] { token_bucket.reset(1); });

// Wait until the thread is actually waiting.
token_bucket.synchronizer().barrierOn(SharedTokenBucketImpl::ResetCheckSyncPoint);

// Mutex should be already locked.
EXPECT_TRUE(token_bucket.isMutexLocked());
token_bucket.synchronizer().signal(SharedTokenBucketImpl::ResetCheckSyncPoint);

thread.join();
EXPECT_FALSE(token_bucket.isMutexLocked());

EXPECT_EQ(1, token_bucket.consume(2, true));
EXPECT_EQ(std::chrono::milliseconds(63), token_bucket.nextTokenAvailable());

// Reset again. Should be ignored for shared bucket.
token_bucket.reset(5);
EXPECT_EQ(0, token_bucket.consume(5, true));
}

// Verifies that TokenBucket can consume tokens with thread safety.
TEST_F(SharedSharedTokenBucketImplTest, SynchronizedConsume) {
SharedTokenBucketImpl token_bucket{10, time_system_, 1};

token_bucket.synchronizer().enable();
// Start a thread and call consume. This will wait post lock.
token_bucket.synchronizer().waitOn(SharedTokenBucketImpl::GetImplSyncPoint);
std::thread thread([&] { EXPECT_EQ(10, token_bucket.consume(20, true)); });

// Wait until the thread is actually waiting.
token_bucket.synchronizer().barrierOn(SharedTokenBucketImpl::GetImplSyncPoint);

// Mutex should be already locked.
EXPECT_TRUE(token_bucket.isMutexLocked());
token_bucket.synchronizer().signal(SharedTokenBucketImpl::GetImplSyncPoint);
thread.join();
EXPECT_FALSE(token_bucket.isMutexLocked());
}

TEST_F(SharedSharedTokenBucketImplTest, SynchronizedNextTokenAvailable) {
SharedTokenBucketImpl token_bucket{10, time_system_, 16};

token_bucket.synchronizer().enable();
// Start a thread and call consume. This will wait post lock.
token_bucket.synchronizer().waitOn(SharedTokenBucketImpl::GetImplSyncPoint);
std::thread thread(
[&] { EXPECT_EQ(std::chrono::milliseconds(0), token_bucket.nextTokenAvailable()); });

// Wait until the thread is actually waiting.
token_bucket.synchronizer().barrierOn(SharedTokenBucketImpl::GetImplSyncPoint);

// Mutex should be already locked.
EXPECT_TRUE(token_bucket.isMutexLocked());
token_bucket.synchronizer().signal(SharedTokenBucketImpl::GetImplSyncPoint);
thread.join();
EXPECT_FALSE(token_bucket.isMutexLocked());
}

} // namespace Envoy