From 43b002fb6a0a9cf05c708bb5e3ca58dd41239c18 Mon Sep 17 00:00:00 2001 From: Nitin Date: Thu, 7 Jan 2021 12:03:00 -0800 Subject: [PATCH 01/13] Make token bucket optionally thread-safe Signed-off-by: Nitin --- source/common/common/token_bucket_impl.cc | 14 +++++++++++-- source/common/common/token_bucket_impl.h | 24 +++++++++++++++++++---- 2 files changed, 32 insertions(+), 6 deletions(-) diff --git a/source/common/common/token_bucket_impl.cc b/source/common/common/token_bucket_impl.cc index 5e7de9e6bb1a7..3291577f0d9a0 100644 --- a/source/common/common/token_bucket_impl.cc +++ b/source/common/common/token_bucket_impl.cc @@ -4,11 +4,14 @@ namespace Envoy { -TokenBucketImpl::TokenBucketImpl(uint64_t max_tokens, TimeSource& time_source, double fill_rate) +TokenBucketImpl::TokenBucketImpl(uint64_t max_tokens, TimeSource& time_source, double fill_rate, + absl::Mutex* mutex) : max_tokens_(max_tokens), fill_rate_(std::abs(fill_rate)), tokens_(max_tokens), - last_fill_(time_source.monotonicTime()), time_source_(time_source) {} + last_fill_(time_source.monotonicTime()), time_source_(time_source), mutex_(mutex), + reset_once_(false) {} uint64_t TokenBucketImpl::consume(uint64_t tokens, bool allow_partial) { + absl::MutexLockMaybe lock(mutex_); if (tokens_ < max_tokens_) { const auto time_now = time_source_.monotonicTime(); tokens_ = std::min((std::chrono::duration(time_now - last_fill_).count() * fill_rate_) + @@ -31,6 +34,7 @@ uint64_t TokenBucketImpl::consume(uint64_t tokens, bool allow_partial) { std::chrono::milliseconds TokenBucketImpl::nextTokenAvailable() { // If there are tokens available, return immediately. + absl::MutexLockMaybe lock(mutex_); if (tokens_ >= 1) { return std::chrono::milliseconds(0); } @@ -40,8 +44,14 @@ std::chrono::milliseconds TokenBucketImpl::nextTokenAvailable() { void TokenBucketImpl::reset(uint64_t num_tokens) { ASSERT(num_tokens <= max_tokens_); + absl::MutexLockMaybe lock(mutex_); + // Don't reset if thread-safe i.e. shared and reset once before. + if (mutex_ && reset_once_) { + return; + } tokens_ = num_tokens; last_fill_ = time_source_.monotonicTime(); + reset_once_ = true; } } // namespace Envoy diff --git a/source/common/common/token_bucket_impl.h b/source/common/common/token_bucket_impl.h index 644a4185dd5ab..d82ee341633ba 100644 --- a/source/common/common/token_bucket_impl.h +++ b/source/common/common/token_bucket_impl.h @@ -5,10 +5,12 @@ #include "common/common/utility.h" +#include "absl/synchronization/mutex.h" + namespace Envoy { /** - * A class that implements token bucket interface (not thread-safe). + * A class that implements token bucket interface. */ class TokenBucketImpl : public TokenBucket { public: @@ -17,20 +19,34 @@ class TokenBucketImpl : public TokenBucket { * @param time_source supplies the time source. * @param fill_rate supplies the number of tokens that will return to the bucket on each second. * The default is 1. + * @param mutex supplies the mutex object to be used to ensure thread-safety when the token bucket + * is shared. By default the class will be thread-unsafe. */ - explicit TokenBucketImpl(uint64_t max_tokens, TimeSource& time_source, double fill_rate = 1); + explicit TokenBucketImpl(uint64_t max_tokens, TimeSource& time_source, double fill_rate = 1, + absl::Mutex* mutex = nullptr); + + TokenBucketImpl(const TokenBucketImpl&) = delete; + TokenBucketImpl(TokenBucketImpl&&) = delete; // TokenBucket uint64_t consume(uint64_t tokens, bool allow_partial) override; std::chrono::milliseconds nextTokenAvailable() override; + + /** + * Resets the bucket to contain tokens equal to @param num_tokens + * When the token bucket is shared, only the first reset call will work. Subsequent calls to reset + * method will be ignored. + */ void reset(uint64_t num_tokens) override; private: const double max_tokens_; const double fill_rate_; - double tokens_; - MonotonicTime last_fill_; + double tokens_ ABSL_GUARDED_BY(mutex_); + MonotonicTime last_fill_ ABSL_GUARDED_BY(mutex_); TimeSource& time_source_; + absl::Mutex* const mutex_; + bool reset_once_; }; } // namespace Envoy From 9e3e285ccef414b5f4584d17921ca5fdf1615bb7 Mon Sep 17 00:00:00 2001 From: Nitin Date: Tue, 26 Jan 2021 04:05:05 -0800 Subject: [PATCH 02/13] add synch test Signed-off-by: Nitin --- source/common/common/BUILD | 2 + source/common/common/token_bucket_impl.cc | 15 +++- source/common/common/token_bucket_impl.h | 20 +++-- test/common/common/token_bucket_impl_test.cc | 77 ++++++++++++++++++++ 4 files changed, 103 insertions(+), 11 deletions(-) diff --git a/source/common/common/BUILD b/source/common/common/BUILD index 89e41ae0c7b1d..3fe1ef346b6c1 100644 --- a/source/common/common/BUILD +++ b/source/common/common/BUILD @@ -397,6 +397,8 @@ envoy_cc_library( deps = [ "//include/envoy/common:time_interface", "//include/envoy/common:token_bucket_interface", + "//source/common/common:thread_lib", + "//source/common/common:thread_synchronizer_lib", "//source/common/common:utility_lib", ], ) diff --git a/source/common/common/token_bucket_impl.cc b/source/common/common/token_bucket_impl.cc index 3291577f0d9a0..b50c78b04acdd 100644 --- a/source/common/common/token_bucket_impl.cc +++ b/source/common/common/token_bucket_impl.cc @@ -1,17 +1,22 @@ #include "common/common/token_bucket_impl.h" +#include "common/common/lock_guard.h" #include namespace Envoy { +const char TokenBucketImpl::MutexLockedSyncPoint[] = "post_lock"; + TokenBucketImpl::TokenBucketImpl(uint64_t max_tokens, TimeSource& time_source, double fill_rate, - absl::Mutex* mutex) + Thread::MutexBasicLockable* mutex) : max_tokens_(max_tokens), fill_rate_(std::abs(fill_rate)), tokens_(max_tokens), last_fill_(time_source.monotonicTime()), time_source_(time_source), mutex_(mutex), reset_once_(false) {} uint64_t TokenBucketImpl::consume(uint64_t tokens, bool allow_partial) { - absl::MutexLockMaybe lock(mutex_); + Thread::OptionalLockGuard lock(mutex_); + // absl::MutexLockMaybe lock(mutex_); + synchronizer_.syncPoint(MutexLockedSyncPoint); if (tokens_ < max_tokens_) { const auto time_now = time_source_.monotonicTime(); tokens_ = std::min((std::chrono::duration(time_now - last_fill_).count() * fill_rate_) + @@ -34,7 +39,8 @@ uint64_t TokenBucketImpl::consume(uint64_t tokens, bool allow_partial) { std::chrono::milliseconds TokenBucketImpl::nextTokenAvailable() { // If there are tokens available, return immediately. - absl::MutexLockMaybe lock(mutex_); + Thread::OptionalLockGuard lock(mutex_); + synchronizer_.syncPoint(MutexLockedSyncPoint); if (tokens_ >= 1) { return std::chrono::milliseconds(0); } @@ -44,7 +50,8 @@ std::chrono::milliseconds TokenBucketImpl::nextTokenAvailable() { void TokenBucketImpl::reset(uint64_t num_tokens) { ASSERT(num_tokens <= max_tokens_); - absl::MutexLockMaybe lock(mutex_); + Thread::OptionalLockGuard lock(mutex_); + synchronizer_.syncPoint(MutexLockedSyncPoint); // Don't reset if thread-safe i.e. shared and reset once before. if (mutex_ && reset_once_) { return; diff --git a/source/common/common/token_bucket_impl.h b/source/common/common/token_bucket_impl.h index d82ee341633ba..b39f8cd461693 100644 --- a/source/common/common/token_bucket_impl.h +++ b/source/common/common/token_bucket_impl.h @@ -3,10 +3,10 @@ #include "envoy/common/time.h" #include "envoy/common/token_bucket.h" +#include "common/common/thread.h" +#include "common/common/thread_synchronizer.h" #include "common/common/utility.h" -#include "absl/synchronization/mutex.h" - namespace Envoy { /** @@ -14,6 +14,7 @@ namespace Envoy { */ class TokenBucketImpl : public TokenBucket { public: + static const char MutexLockedSyncPoint[]; /** * @param max_tokens supplies the maximum number of tokens in the bucket. * @param time_source supplies the time source. @@ -23,21 +24,25 @@ class TokenBucketImpl : public TokenBucket { * is shared. By default the class will be thread-unsafe. */ explicit TokenBucketImpl(uint64_t max_tokens, TimeSource& time_source, double fill_rate = 1, - absl::Mutex* mutex = nullptr); + Thread::MutexBasicLockable* mutex = nullptr); TokenBucketImpl(const TokenBucketImpl&) = delete; TokenBucketImpl(TokenBucketImpl&&) = delete; // TokenBucket - uint64_t consume(uint64_t tokens, bool allow_partial) override; - std::chrono::milliseconds nextTokenAvailable() override; + uint64_t consume(uint64_t tokens, bool allow_partial) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_) override; + std::chrono::milliseconds nextTokenAvailable() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_) override; /** * Resets the bucket to contain tokens equal to @param num_tokens * When the token bucket is shared, only the first reset call will work. Subsequent calls to reset * method will be ignored. */ - void reset(uint64_t num_tokens) override; + void reset(uint64_t num_tokens) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_) override; + + // Used only for testing. + Thread::ThreadSynchronizer& synchronizer() { return synchronizer_; }; private: const double max_tokens_; @@ -45,8 +50,9 @@ class TokenBucketImpl : public TokenBucket { double tokens_ ABSL_GUARDED_BY(mutex_); MonotonicTime last_fill_ ABSL_GUARDED_BY(mutex_); TimeSource& time_source_; - absl::Mutex* const mutex_; + Thread::MutexBasicLockable* mutex_; bool reset_once_; + mutable Thread::ThreadSynchronizer synchronizer_; // Used only for testing. }; } // namespace Envoy diff --git a/test/common/common/token_bucket_impl_test.cc b/test/common/common/token_bucket_impl_test.cc index 8e2b6b2b27a59..e130300cbd485 100644 --- a/test/common/common/token_bucket_impl_test.cc +++ b/test/common/common/token_bucket_impl_test.cc @@ -11,6 +11,14 @@ namespace Envoy { class TokenBucketImplTest : public testing::Test { protected: Event::SimulatedTimeSystem time_system_; + + bool isMutexLocked(Thread::MutexBasicLockable& mutex) { + auto locked = mutex.tryLock(); + if (locked) { + mutex.unlock(); + } + return !locked; + } }; // Verifies TokenBucket initialization. @@ -91,6 +99,75 @@ TEST_F(TokenBucketImplTest, Reset) { token_bucket.reset(1); EXPECT_EQ(1, token_bucket.consume(2, true)); EXPECT_EQ(std::chrono::milliseconds(63), token_bucket.nextTokenAvailable()); + + // Reset again. Should be honored. + token_bucket.reset(5); + EXPECT_EQ(5, token_bucket.consume(5, true)); +} + +// Verifies that TokenBucket can consume tokens with thread safety. +TEST_F(TokenBucketImplTest, SharedBucketSynchronizedConsume) { + Thread::MutexBasicLockable mutex; + TokenBucketImpl token_bucket{10, time_system_, 1, &mutex}; + + token_bucket.synchronizer().enable(); + // Start a thread and call consume. This will wait post lock. + token_bucket.synchronizer().waitOn(TokenBucketImpl::MutexLockedSyncPoint); + std::thread thread([&] { EXPECT_EQ(10, token_bucket.consume(20, true)); }); + + // Wait until the thread is actually waiting. + token_bucket.synchronizer().barrierOn(TokenBucketImpl::MutexLockedSyncPoint); + + // Mutex should be already locked. + EXPECT_TRUE(isMutexLocked(mutex)); + token_bucket.synchronizer().signal(TokenBucketImpl::MutexLockedSyncPoint); + thread.join(); + EXPECT_FALSE(isMutexLocked(mutex)); +} + +TEST_F(TokenBucketImplTest, SharedBucketNextTokenAvailable) { + Thread::MutexBasicLockable mutex; + TokenBucketImpl token_bucket{10, time_system_, 16, &mutex}; + + token_bucket.synchronizer().enable(); + // Start a thread and call consume. This will wait post lock. + token_bucket.synchronizer().waitOn(TokenBucketImpl::MutexLockedSyncPoint); + std::thread thread( + [&] { EXPECT_EQ(std::chrono::milliseconds(0), token_bucket.nextTokenAvailable()); }); + + // Wait until the thread is actually waiting. + token_bucket.synchronizer().barrierOn(TokenBucketImpl::MutexLockedSyncPoint); + + // Mutex should be already locked. + EXPECT_TRUE(isMutexLocked(mutex)); + token_bucket.synchronizer().signal(TokenBucketImpl::MutexLockedSyncPoint); + thread.join(); + EXPECT_FALSE(isMutexLocked(mutex)); +} + +// Test reset functionality for a shared token bucket. +TEST_F(TokenBucketImplTest, SharedBucketReset) { + Thread::MutexBasicLockable mutex; + TokenBucketImpl token_bucket{16, time_system_, 16, &mutex}; + token_bucket.synchronizer().enable(); + // Start a thread and call consume. This will wait post lock. + token_bucket.synchronizer().waitOn(TokenBucketImpl::MutexLockedSyncPoint); + std::thread thread([&] { token_bucket.reset(1); }); + // Wait until the thread is actually waiting. + token_bucket.synchronizer().barrierOn(TokenBucketImpl::MutexLockedSyncPoint); + + // Mutex should be already locked. + EXPECT_TRUE(isMutexLocked(mutex)); + token_bucket.synchronizer().signal(TokenBucketImpl::MutexLockedSyncPoint); + thread.join(); + EXPECT_FALSE(isMutexLocked(mutex)); + + EXPECT_EQ(1, token_bucket.consume(2, true)); + EXPECT_EQ(std::chrono::milliseconds(63), token_bucket.nextTokenAvailable()); + + // Reset again. Should be ignored for shared bucket. + token_bucket.reset(5); + EXPECT_EQ(0, token_bucket.consume(5, true)); } } // namespace Envoy From b84a6f105bee97994aa7614c84e67d2dc1190e4a Mon Sep 17 00:00:00 2001 From: Nitin Date: Tue, 26 Jan 2021 04:29:28 -0800 Subject: [PATCH 03/13] format fix Signed-off-by: Nitin --- source/common/common/token_bucket_impl.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/common/common/token_bucket_impl.cc b/source/common/common/token_bucket_impl.cc index b50c78b04acdd..86a848da7474c 100644 --- a/source/common/common/token_bucket_impl.cc +++ b/source/common/common/token_bucket_impl.cc @@ -1,8 +1,9 @@ #include "common/common/token_bucket_impl.h" -#include "common/common/lock_guard.h" #include +#include "common/common/lock_guard.h" + namespace Envoy { const char TokenBucketImpl::MutexLockedSyncPoint[] = "post_lock"; From 2a0a9f416f43414edd950f467aae96ef3c5bf3cf Mon Sep 17 00:00:00 2001 From: Nitin Date: Tue, 26 Jan 2021 05:15:50 -0800 Subject: [PATCH 04/13] remove absl guard macros Signed-off-by: Nitin --- source/common/common/token_bucket_impl.h | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/source/common/common/token_bucket_impl.h b/source/common/common/token_bucket_impl.h index b39f8cd461693..31d77de4f1602 100644 --- a/source/common/common/token_bucket_impl.h +++ b/source/common/common/token_bucket_impl.h @@ -30,16 +30,15 @@ class TokenBucketImpl : public TokenBucket { TokenBucketImpl(TokenBucketImpl&&) = delete; // TokenBucket - uint64_t consume(uint64_t tokens, bool allow_partial) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_) override; - std::chrono::milliseconds nextTokenAvailable() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_) override; + uint64_t consume(uint64_t tokens, bool allow_partial) override; + std::chrono::milliseconds nextTokenAvailable() override; /** * Resets the bucket to contain tokens equal to @param num_tokens * When the token bucket is shared, only the first reset call will work. Subsequent calls to reset * method will be ignored. */ - void reset(uint64_t num_tokens) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_) override; + void reset(uint64_t num_tokens) override; // Used only for testing. Thread::ThreadSynchronizer& synchronizer() { return synchronizer_; }; From 27b8d4efd2bf9953b3611eb8bef43d746b1085e6 Mon Sep 17 00:00:00 2001 From: Nitin Date: Tue, 26 Jan 2021 12:35:44 -0800 Subject: [PATCH 05/13] create separate wrapper class Signed-off-by: Nitin --- source/common/common/BUILD | 11 ++ .../common/common/shared_token_bucket_impl.cc | 49 ++++++ .../common/common/shared_token_bucket_impl.h | 59 +++++++ source/common/common/token_bucket_impl.cc | 22 +-- source/common/common/token_bucket_impl.h | 29 +--- test/common/common/BUILD | 10 ++ .../common/shared_token_bucket_impl_test.cc | 152 ++++++++++++++++++ test/common/common/token_bucket_impl_test.cc | 77 --------- 8 files changed, 287 insertions(+), 122 deletions(-) create mode 100644 source/common/common/shared_token_bucket_impl.cc create mode 100644 source/common/common/shared_token_bucket_impl.h create mode 100644 test/common/common/shared_token_bucket_impl_test.cc diff --git a/source/common/common/BUILD b/source/common/common/BUILD index 3fe1ef346b6c1..f3b54fac567af 100644 --- a/source/common/common/BUILD +++ b/source/common/common/BUILD @@ -397,6 +397,17 @@ envoy_cc_library( deps = [ "//include/envoy/common:time_interface", "//include/envoy/common:token_bucket_interface", + "//source/common/common:utility_lib", + ], +) + +envoy_cc_library( + name = "shared_token_bucket_impl_lib", + srcs = ["shared_token_bucket_impl.cc"], + hdrs = ["shared_token_bucket_impl.h"], + deps = [ + "//include/envoy/common:time_interface", + "//source/common/common:token_bucket_impl_lib", "//source/common/common:thread_lib", "//source/common/common:thread_synchronizer_lib", "//source/common/common:utility_lib", diff --git a/source/common/common/shared_token_bucket_impl.cc b/source/common/common/shared_token_bucket_impl.cc new file mode 100644 index 0000000000000..45b047eb9fc28 --- /dev/null +++ b/source/common/common/shared_token_bucket_impl.cc @@ -0,0 +1,49 @@ +#include "common/common/shared_token_bucket_impl.h" + +#include + +#include "common/common/lock_guard.h" + +namespace Envoy { + +const char SharedTokenBucketImpl::GetImplSyncPoint[] = "pre_get_impl"; +const char SharedTokenBucketImpl::ResetCheckSyncPoint[] = "post_reset_check"; + +SharedTokenBucketImpl::SharedTokenBucketImpl(uint64_t max_tokens, TimeSource& time_source, + double fill_rate) + : impl_(max_tokens, time_source, fill_rate), reset_once_(false) {} + +uint64_t SharedTokenBucketImpl::consume(uint64_t tokens, bool allow_partial) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_) { + Thread::LockGuard lock(mutex_); + synchronizer_.syncPoint(GetImplSyncPoint); + return getImpl().consume(tokens, allow_partial); +}; + +std::chrono::milliseconds SharedTokenBucketImpl::nextTokenAvailable() + ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_) { + Thread::LockGuard lock(mutex_); + synchronizer_.syncPoint(GetImplSyncPoint); + return getImpl().nextTokenAvailable(); +}; + +void SharedTokenBucketImpl::reset(uint64_t num_tokens) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_) { + Thread::LockGuard lock(mutex_); + // Don't reset if reset once before. + if (reset_once_) { + return; + } + reset_once_ = true; + synchronizer_.syncPoint(ResetCheckSyncPoint); + getImpl().reset(num_tokens); +}; + +bool SharedTokenBucketImpl::isMutexLocked() { + auto locked = mutex_.tryLock(); + if (locked) { + mutex_.unlock(); + } + return !locked; +} + +} // namespace Envoy diff --git a/source/common/common/shared_token_bucket_impl.h b/source/common/common/shared_token_bucket_impl.h new file mode 100644 index 0000000000000..4e4a432532234 --- /dev/null +++ b/source/common/common/shared_token_bucket_impl.h @@ -0,0 +1,59 @@ +#pragma once + +#include "common/common/token_bucket_impl.h" + +#include "common/common/thread.h" +#include "common/common/thread_synchronizer.h" + +namespace Envoy { + +/** + * A thread-safe wrapper class for TokenBucket interface. + */ +class SharedTokenBucketImpl : public TokenBucket { +public: + static const char GetImplSyncPoint[]; + static const char ResetCheckSyncPoint[]; + /** + * @param max_tokens supplies the maximum number of tokens in the bucket. + * @param time_source supplies the time source. + * @param fill_rate supplies the number of tokens that will return to the bucket on each second. + * The default is 1. + * @param mutex supplies the mutex object to be used to ensure thread-safety when the token bucket + * is shared. By default the class will be thread-unsafe. + */ + explicit SharedTokenBucketImpl(uint64_t max_tokens, TimeSource& time_source, + double fill_rate = 1); + + SharedTokenBucketImpl(const SharedTokenBucketImpl&) = delete; + SharedTokenBucketImpl(SharedTokenBucketImpl&&) = delete; + + // TokenBucket + uint64_t consume(uint64_t tokens, bool allow_partial) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_) override; + + std::chrono::milliseconds nextTokenAvailable() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_) override; + + /** + * Resets the bucket to contain tokens equal to @param num_tokens + * Since the token bucket is shared, only the first reset call will work. Subsequent calls to + * reset method will be ignored. + */ + void reset(uint64_t num_tokens) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_) override; + + // Used only for testing. + Thread::ThreadSynchronizer& synchronizer() { return synchronizer_; }; + + // Returns a flag to indicate whether mutex was in lock state. + bool isMutexLocked() ABSL_EXCLUSIVE_TRYLOCK_FUNCTION(false, mutex_); + +private: + TokenBucketImpl& getImpl() { return impl_; } + + Thread::MutexBasicLockable mutex_; + TokenBucketImpl impl_ ABSL_GUARDED_BY(mutex_); + bool reset_once_ ABSL_GUARDED_BY(mutex_); + mutable Thread::ThreadSynchronizer synchronizer_; // Used only for testing. +}; + +} // namespace Envoy diff --git a/source/common/common/token_bucket_impl.cc b/source/common/common/token_bucket_impl.cc index 86a848da7474c..5e7de9e6bb1a7 100644 --- a/source/common/common/token_bucket_impl.cc +++ b/source/common/common/token_bucket_impl.cc @@ -2,22 +2,13 @@ #include -#include "common/common/lock_guard.h" - namespace Envoy { -const char TokenBucketImpl::MutexLockedSyncPoint[] = "post_lock"; - -TokenBucketImpl::TokenBucketImpl(uint64_t max_tokens, TimeSource& time_source, double fill_rate, - Thread::MutexBasicLockable* mutex) +TokenBucketImpl::TokenBucketImpl(uint64_t max_tokens, TimeSource& time_source, double fill_rate) : max_tokens_(max_tokens), fill_rate_(std::abs(fill_rate)), tokens_(max_tokens), - last_fill_(time_source.monotonicTime()), time_source_(time_source), mutex_(mutex), - reset_once_(false) {} + last_fill_(time_source.monotonicTime()), time_source_(time_source) {} uint64_t TokenBucketImpl::consume(uint64_t tokens, bool allow_partial) { - Thread::OptionalLockGuard lock(mutex_); - // absl::MutexLockMaybe lock(mutex_); - synchronizer_.syncPoint(MutexLockedSyncPoint); if (tokens_ < max_tokens_) { const auto time_now = time_source_.monotonicTime(); tokens_ = std::min((std::chrono::duration(time_now - last_fill_).count() * fill_rate_) + @@ -40,8 +31,6 @@ uint64_t TokenBucketImpl::consume(uint64_t tokens, bool allow_partial) { std::chrono::milliseconds TokenBucketImpl::nextTokenAvailable() { // If there are tokens available, return immediately. - Thread::OptionalLockGuard lock(mutex_); - synchronizer_.syncPoint(MutexLockedSyncPoint); if (tokens_ >= 1) { return std::chrono::milliseconds(0); } @@ -51,15 +40,8 @@ std::chrono::milliseconds TokenBucketImpl::nextTokenAvailable() { void TokenBucketImpl::reset(uint64_t num_tokens) { ASSERT(num_tokens <= max_tokens_); - Thread::OptionalLockGuard lock(mutex_); - synchronizer_.syncPoint(MutexLockedSyncPoint); - // Don't reset if thread-safe i.e. shared and reset once before. - if (mutex_ && reset_once_) { - return; - } tokens_ = num_tokens; last_fill_ = time_source_.monotonicTime(); - reset_once_ = true; } } // namespace Envoy diff --git a/source/common/common/token_bucket_impl.h b/source/common/common/token_bucket_impl.h index 31d77de4f1602..644a4185dd5ab 100644 --- a/source/common/common/token_bucket_impl.h +++ b/source/common/common/token_bucket_impl.h @@ -3,55 +3,34 @@ #include "envoy/common/time.h" #include "envoy/common/token_bucket.h" -#include "common/common/thread.h" -#include "common/common/thread_synchronizer.h" #include "common/common/utility.h" namespace Envoy { /** - * A class that implements token bucket interface. + * A class that implements token bucket interface (not thread-safe). */ class TokenBucketImpl : public TokenBucket { public: - static const char MutexLockedSyncPoint[]; /** * @param max_tokens supplies the maximum number of tokens in the bucket. * @param time_source supplies the time source. * @param fill_rate supplies the number of tokens that will return to the bucket on each second. * The default is 1. - * @param mutex supplies the mutex object to be used to ensure thread-safety when the token bucket - * is shared. By default the class will be thread-unsafe. */ - explicit TokenBucketImpl(uint64_t max_tokens, TimeSource& time_source, double fill_rate = 1, - Thread::MutexBasicLockable* mutex = nullptr); - - TokenBucketImpl(const TokenBucketImpl&) = delete; - TokenBucketImpl(TokenBucketImpl&&) = delete; + explicit TokenBucketImpl(uint64_t max_tokens, TimeSource& time_source, double fill_rate = 1); // TokenBucket uint64_t consume(uint64_t tokens, bool allow_partial) override; std::chrono::milliseconds nextTokenAvailable() override; - - /** - * Resets the bucket to contain tokens equal to @param num_tokens - * When the token bucket is shared, only the first reset call will work. Subsequent calls to reset - * method will be ignored. - */ void reset(uint64_t num_tokens) override; - // Used only for testing. - Thread::ThreadSynchronizer& synchronizer() { return synchronizer_; }; - private: const double max_tokens_; const double fill_rate_; - double tokens_ ABSL_GUARDED_BY(mutex_); - MonotonicTime last_fill_ ABSL_GUARDED_BY(mutex_); + double tokens_; + MonotonicTime last_fill_; TimeSource& time_source_; - Thread::MutexBasicLockable* mutex_; - bool reset_once_; - mutable Thread::ThreadSynchronizer synchronizer_; // Used only for testing. }; } // namespace Envoy diff --git a/test/common/common/BUILD b/test/common/common/BUILD index c002da716b0a6..0f50ca3d0d627 100644 --- a/test/common/common/BUILD +++ b/test/common/common/BUILD @@ -263,6 +263,16 @@ envoy_cc_test( ], ) +envoy_cc_test( + name = "shared_token_bucket_impl_test", + srcs = ["shared_token_bucket_impl_test.cc"], + deps = [ + "//source/common/common:shared_token_bucket_impl_lib", + "//test/test_common:simulated_time_system_lib", + "//test/test_common:utility_lib", + ], +) + envoy_cc_test( name = "callback_impl_test", srcs = ["callback_impl_test.cc"], diff --git a/test/common/common/shared_token_bucket_impl_test.cc b/test/common/common/shared_token_bucket_impl_test.cc new file mode 100644 index 0000000000000..50c292ac2f94e --- /dev/null +++ b/test/common/common/shared_token_bucket_impl_test.cc @@ -0,0 +1,152 @@ +#include + +#include "common/common/shared_token_bucket_impl.h" + +#include "test/test_common/simulated_time_system.h" + +#include "gtest/gtest.h" + +namespace Envoy { + +class SharedSharedTokenBucketImplTest : public testing::Test { +protected: + Event::SimulatedTimeSystem time_system_; +}; + +// Verifies TokenBucket initialization. +TEST_F(SharedSharedTokenBucketImplTest, Initialization) { + SharedTokenBucketImpl token_bucket{1, time_system_, -1.0}; + + EXPECT_EQ(1, token_bucket.consume(1, false)); + EXPECT_EQ(0, token_bucket.consume(1, false)); +} + +// Verifies TokenBucket's maximum capacity. +TEST_F(SharedSharedTokenBucketImplTest, MaxBucketSize) { + SharedTokenBucketImpl token_bucket{3, time_system_, 1}; + + EXPECT_EQ(3, token_bucket.consume(3, false)); + time_system_.setMonotonicTime(std::chrono::seconds(10)); + EXPECT_EQ(0, token_bucket.consume(4, false)); + EXPECT_EQ(3, token_bucket.consume(3, false)); +} + +// Verifies that TokenBucket can consume tokens. +TEST_F(SharedSharedTokenBucketImplTest, Consume) { + SharedTokenBucketImpl token_bucket{10, time_system_, 1}; + + EXPECT_EQ(0, token_bucket.consume(20, false)); + EXPECT_EQ(9, token_bucket.consume(9, false)); + + EXPECT_EQ(1, token_bucket.consume(1, false)); + + time_system_.setMonotonicTime(std::chrono::milliseconds(999)); + EXPECT_EQ(0, token_bucket.consume(1, false)); + + time_system_.setMonotonicTime(std::chrono::milliseconds(5999)); + EXPECT_EQ(0, token_bucket.consume(6, false)); + + time_system_.setMonotonicTime(std::chrono::milliseconds(6000)); + EXPECT_EQ(6, token_bucket.consume(6, false)); + EXPECT_EQ(0, token_bucket.consume(1, false)); +} + +// Verifies that TokenBucket can refill tokens. +TEST_F(SharedSharedTokenBucketImplTest, Refill) { + SharedTokenBucketImpl token_bucket{1, time_system_, 0.5}; + EXPECT_EQ(1, token_bucket.consume(1, false)); + + time_system_.setMonotonicTime(std::chrono::milliseconds(500)); + EXPECT_EQ(0, token_bucket.consume(1, false)); + time_system_.setMonotonicTime(std::chrono::milliseconds(1500)); + EXPECT_EQ(0, token_bucket.consume(1, false)); + time_system_.setMonotonicTime(std::chrono::milliseconds(2000)); + EXPECT_EQ(1, token_bucket.consume(1, false)); +} + +TEST_F(SharedSharedTokenBucketImplTest, NextTokenAvailable) { + SharedTokenBucketImpl token_bucket{10, time_system_, 5}; + EXPECT_EQ(9, token_bucket.consume(9, false)); + EXPECT_EQ(std::chrono::milliseconds(0), token_bucket.nextTokenAvailable()); + EXPECT_EQ(1, token_bucket.consume(1, false)); + EXPECT_EQ(0, token_bucket.consume(1, false)); + EXPECT_EQ(std::chrono::milliseconds(200), token_bucket.nextTokenAvailable()); +} + +// Test partial consumption of tokens. +TEST_F(SharedSharedTokenBucketImplTest, PartialConsumption) { + SharedTokenBucketImpl token_bucket{16, time_system_, 16}; + EXPECT_EQ(16, token_bucket.consume(18, true)); + EXPECT_EQ(std::chrono::milliseconds(63), token_bucket.nextTokenAvailable()); + time_system_.advanceTimeWait(std::chrono::milliseconds(62)); + EXPECT_EQ(0, token_bucket.consume(1, true)); + time_system_.advanceTimeWait(std::chrono::milliseconds(1)); + EXPECT_EQ(1, token_bucket.consume(2, true)); + EXPECT_EQ(std::chrono::milliseconds(63), token_bucket.nextTokenAvailable()); +} + +// Test reset functionality for a shared token bucket. +TEST_F(SharedSharedTokenBucketImplTest, Reset) { + SharedTokenBucketImpl token_bucket{16, time_system_, 16}; + token_bucket.synchronizer().enable(); + // Start a thread and call consume. This will wait post checking reset_once flag. + token_bucket.synchronizer().waitOn(SharedTokenBucketImpl::ResetCheckSyncPoint); + std::thread thread([&] { token_bucket.reset(1); }); + + // Wait until the thread is actually waiting. + token_bucket.synchronizer().barrierOn(SharedTokenBucketImpl::ResetCheckSyncPoint); + + // Mutex should be already locked. + EXPECT_TRUE(token_bucket.isMutexLocked()); + token_bucket.synchronizer().signal(SharedTokenBucketImpl::ResetCheckSyncPoint); + + thread.join(); + EXPECT_FALSE(token_bucket.isMutexLocked()); + + EXPECT_EQ(1, token_bucket.consume(2, true)); + EXPECT_EQ(std::chrono::milliseconds(63), token_bucket.nextTokenAvailable()); + + // Reset again. Should be ignored for shared bucket. + token_bucket.reset(5); + EXPECT_EQ(0, token_bucket.consume(5, true)); +} + +// Verifies that TokenBucket can consume tokens with thread safety. +TEST_F(SharedSharedTokenBucketImplTest, SynchronizedConsume) { + SharedTokenBucketImpl token_bucket{10, time_system_, 1}; + + token_bucket.synchronizer().enable(); + // Start a thread and call consume. This will wait post lock. + token_bucket.synchronizer().waitOn(SharedTokenBucketImpl::GetImplSyncPoint); + std::thread thread([&] { EXPECT_EQ(10, token_bucket.consume(20, true)); }); + + // Wait until the thread is actually waiting. + token_bucket.synchronizer().barrierOn(SharedTokenBucketImpl::GetImplSyncPoint); + + // Mutex should be already locked. + EXPECT_TRUE(token_bucket.isMutexLocked()); + token_bucket.synchronizer().signal(SharedTokenBucketImpl::GetImplSyncPoint); + thread.join(); + EXPECT_FALSE(token_bucket.isMutexLocked()); +} + +TEST_F(SharedSharedTokenBucketImplTest, SynchronizedNextTokenAvailable) { + SharedTokenBucketImpl token_bucket{10, time_system_, 16}; + + token_bucket.synchronizer().enable(); + // Start a thread and call consume. This will wait post lock. + token_bucket.synchronizer().waitOn(SharedTokenBucketImpl::GetImplSyncPoint); + std::thread thread( + [&] { EXPECT_EQ(std::chrono::milliseconds(0), token_bucket.nextTokenAvailable()); }); + + // Wait until the thread is actually waiting. + token_bucket.synchronizer().barrierOn(SharedTokenBucketImpl::GetImplSyncPoint); + + // Mutex should be already locked. + EXPECT_TRUE(token_bucket.isMutexLocked()); + token_bucket.synchronizer().signal(SharedTokenBucketImpl::GetImplSyncPoint); + thread.join(); + EXPECT_FALSE(token_bucket.isMutexLocked()); +} + +} // namespace Envoy diff --git a/test/common/common/token_bucket_impl_test.cc b/test/common/common/token_bucket_impl_test.cc index e130300cbd485..8e2b6b2b27a59 100644 --- a/test/common/common/token_bucket_impl_test.cc +++ b/test/common/common/token_bucket_impl_test.cc @@ -11,14 +11,6 @@ namespace Envoy { class TokenBucketImplTest : public testing::Test { protected: Event::SimulatedTimeSystem time_system_; - - bool isMutexLocked(Thread::MutexBasicLockable& mutex) { - auto locked = mutex.tryLock(); - if (locked) { - mutex.unlock(); - } - return !locked; - } }; // Verifies TokenBucket initialization. @@ -99,75 +91,6 @@ TEST_F(TokenBucketImplTest, Reset) { token_bucket.reset(1); EXPECT_EQ(1, token_bucket.consume(2, true)); EXPECT_EQ(std::chrono::milliseconds(63), token_bucket.nextTokenAvailable()); - - // Reset again. Should be honored. - token_bucket.reset(5); - EXPECT_EQ(5, token_bucket.consume(5, true)); -} - -// Verifies that TokenBucket can consume tokens with thread safety. -TEST_F(TokenBucketImplTest, SharedBucketSynchronizedConsume) { - Thread::MutexBasicLockable mutex; - TokenBucketImpl token_bucket{10, time_system_, 1, &mutex}; - - token_bucket.synchronizer().enable(); - // Start a thread and call consume. This will wait post lock. - token_bucket.synchronizer().waitOn(TokenBucketImpl::MutexLockedSyncPoint); - std::thread thread([&] { EXPECT_EQ(10, token_bucket.consume(20, true)); }); - - // Wait until the thread is actually waiting. - token_bucket.synchronizer().barrierOn(TokenBucketImpl::MutexLockedSyncPoint); - - // Mutex should be already locked. - EXPECT_TRUE(isMutexLocked(mutex)); - token_bucket.synchronizer().signal(TokenBucketImpl::MutexLockedSyncPoint); - thread.join(); - EXPECT_FALSE(isMutexLocked(mutex)); -} - -TEST_F(TokenBucketImplTest, SharedBucketNextTokenAvailable) { - Thread::MutexBasicLockable mutex; - TokenBucketImpl token_bucket{10, time_system_, 16, &mutex}; - - token_bucket.synchronizer().enable(); - // Start a thread and call consume. This will wait post lock. - token_bucket.synchronizer().waitOn(TokenBucketImpl::MutexLockedSyncPoint); - std::thread thread( - [&] { EXPECT_EQ(std::chrono::milliseconds(0), token_bucket.nextTokenAvailable()); }); - - // Wait until the thread is actually waiting. - token_bucket.synchronizer().barrierOn(TokenBucketImpl::MutexLockedSyncPoint); - - // Mutex should be already locked. - EXPECT_TRUE(isMutexLocked(mutex)); - token_bucket.synchronizer().signal(TokenBucketImpl::MutexLockedSyncPoint); - thread.join(); - EXPECT_FALSE(isMutexLocked(mutex)); -} - -// Test reset functionality for a shared token bucket. -TEST_F(TokenBucketImplTest, SharedBucketReset) { - Thread::MutexBasicLockable mutex; - TokenBucketImpl token_bucket{16, time_system_, 16, &mutex}; - token_bucket.synchronizer().enable(); - // Start a thread and call consume. This will wait post lock. - token_bucket.synchronizer().waitOn(TokenBucketImpl::MutexLockedSyncPoint); - std::thread thread([&] { token_bucket.reset(1); }); - // Wait until the thread is actually waiting. - token_bucket.synchronizer().barrierOn(TokenBucketImpl::MutexLockedSyncPoint); - - // Mutex should be already locked. - EXPECT_TRUE(isMutexLocked(mutex)); - token_bucket.synchronizer().signal(TokenBucketImpl::MutexLockedSyncPoint); - thread.join(); - EXPECT_FALSE(isMutexLocked(mutex)); - - EXPECT_EQ(1, token_bucket.consume(2, true)); - EXPECT_EQ(std::chrono::milliseconds(63), token_bucket.nextTokenAvailable()); - - // Reset again. Should be ignored for shared bucket. - token_bucket.reset(5); - EXPECT_EQ(0, token_bucket.consume(5, true)); } } // namespace Envoy From 1a104edb5b8d3c1377dd1b0932b740c1700df360 Mon Sep 17 00:00:00 2001 From: Nitin Date: Tue, 26 Jan 2021 13:00:42 -0800 Subject: [PATCH 06/13] fix: format, cloud build err, test class name Signed-off-by: Nitin --- source/common/common/BUILD | 2 +- .../common/common/shared_token_bucket_impl.cc | 6 +++--- .../common/common/shared_token_bucket_impl.h | 9 ++++----- .../common/shared_token_bucket_impl_test.cc | 20 +++++++++---------- 4 files changed, 18 insertions(+), 19 deletions(-) diff --git a/source/common/common/BUILD b/source/common/common/BUILD index f3b54fac567af..4867a1cb0ba8d 100644 --- a/source/common/common/BUILD +++ b/source/common/common/BUILD @@ -407,9 +407,9 @@ envoy_cc_library( hdrs = ["shared_token_bucket_impl.h"], deps = [ "//include/envoy/common:time_interface", - "//source/common/common:token_bucket_impl_lib", "//source/common/common:thread_lib", "//source/common/common:thread_synchronizer_lib", + "//source/common/common:token_bucket_impl_lib", "//source/common/common:utility_lib", ], ) diff --git a/source/common/common/shared_token_bucket_impl.cc b/source/common/common/shared_token_bucket_impl.cc index 45b047eb9fc28..8e711ea8766a2 100644 --- a/source/common/common/shared_token_bucket_impl.cc +++ b/source/common/common/shared_token_bucket_impl.cc @@ -14,20 +14,20 @@ SharedTokenBucketImpl::SharedTokenBucketImpl(uint64_t max_tokens, TimeSource& ti : impl_(max_tokens, time_source, fill_rate), reset_once_(false) {} uint64_t SharedTokenBucketImpl::consume(uint64_t tokens, bool allow_partial) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_) { +/*ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_)*/ { Thread::LockGuard lock(mutex_); synchronizer_.syncPoint(GetImplSyncPoint); return getImpl().consume(tokens, allow_partial); }; std::chrono::milliseconds SharedTokenBucketImpl::nextTokenAvailable() - ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_) { +/*ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_)*/ { Thread::LockGuard lock(mutex_); synchronizer_.syncPoint(GetImplSyncPoint); return getImpl().nextTokenAvailable(); }; -void SharedTokenBucketImpl::reset(uint64_t num_tokens) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_) { +void SharedTokenBucketImpl::reset(uint64_t num_tokens) /*ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_)*/ { Thread::LockGuard lock(mutex_); // Don't reset if reset once before. if (reset_once_) { diff --git a/source/common/common/shared_token_bucket_impl.h b/source/common/common/shared_token_bucket_impl.h index 4e4a432532234..21cc51c06b597 100644 --- a/source/common/common/shared_token_bucket_impl.h +++ b/source/common/common/shared_token_bucket_impl.h @@ -1,9 +1,8 @@ #pragma once -#include "common/common/token_bucket_impl.h" - #include "common/common/thread.h" #include "common/common/thread_synchronizer.h" +#include "common/common/token_bucket_impl.h" namespace Envoy { @@ -30,16 +29,16 @@ class SharedTokenBucketImpl : public TokenBucket { // TokenBucket uint64_t consume(uint64_t tokens, bool allow_partial) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_) override; + /*ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_)*/ override; - std::chrono::milliseconds nextTokenAvailable() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_) override; + std::chrono::milliseconds nextTokenAvailable() /*ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_)*/ override; /** * Resets the bucket to contain tokens equal to @param num_tokens * Since the token bucket is shared, only the first reset call will work. Subsequent calls to * reset method will be ignored. */ - void reset(uint64_t num_tokens) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_) override; + void reset(uint64_t num_tokens) /*ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_)*/ override; // Used only for testing. Thread::ThreadSynchronizer& synchronizer() { return synchronizer_; }; diff --git a/test/common/common/shared_token_bucket_impl_test.cc b/test/common/common/shared_token_bucket_impl_test.cc index 50c292ac2f94e..42f0c3b216350 100644 --- a/test/common/common/shared_token_bucket_impl_test.cc +++ b/test/common/common/shared_token_bucket_impl_test.cc @@ -8,13 +8,13 @@ namespace Envoy { -class SharedSharedTokenBucketImplTest : public testing::Test { +class SharedTokenBucketImplTest : public testing::Test { protected: Event::SimulatedTimeSystem time_system_; }; // Verifies TokenBucket initialization. -TEST_F(SharedSharedTokenBucketImplTest, Initialization) { +TEST_F(SharedTokenBucketImplTest, Initialization) { SharedTokenBucketImpl token_bucket{1, time_system_, -1.0}; EXPECT_EQ(1, token_bucket.consume(1, false)); @@ -22,7 +22,7 @@ TEST_F(SharedSharedTokenBucketImplTest, Initialization) { } // Verifies TokenBucket's maximum capacity. -TEST_F(SharedSharedTokenBucketImplTest, MaxBucketSize) { +TEST_F(SharedTokenBucketImplTest, MaxBucketSize) { SharedTokenBucketImpl token_bucket{3, time_system_, 1}; EXPECT_EQ(3, token_bucket.consume(3, false)); @@ -32,7 +32,7 @@ TEST_F(SharedSharedTokenBucketImplTest, MaxBucketSize) { } // Verifies that TokenBucket can consume tokens. -TEST_F(SharedSharedTokenBucketImplTest, Consume) { +TEST_F(SharedTokenBucketImplTest, Consume) { SharedTokenBucketImpl token_bucket{10, time_system_, 1}; EXPECT_EQ(0, token_bucket.consume(20, false)); @@ -52,7 +52,7 @@ TEST_F(SharedSharedTokenBucketImplTest, Consume) { } // Verifies that TokenBucket can refill tokens. -TEST_F(SharedSharedTokenBucketImplTest, Refill) { +TEST_F(SharedTokenBucketImplTest, Refill) { SharedTokenBucketImpl token_bucket{1, time_system_, 0.5}; EXPECT_EQ(1, token_bucket.consume(1, false)); @@ -64,7 +64,7 @@ TEST_F(SharedSharedTokenBucketImplTest, Refill) { EXPECT_EQ(1, token_bucket.consume(1, false)); } -TEST_F(SharedSharedTokenBucketImplTest, NextTokenAvailable) { +TEST_F(SharedTokenBucketImplTest, NextTokenAvailable) { SharedTokenBucketImpl token_bucket{10, time_system_, 5}; EXPECT_EQ(9, token_bucket.consume(9, false)); EXPECT_EQ(std::chrono::milliseconds(0), token_bucket.nextTokenAvailable()); @@ -74,7 +74,7 @@ TEST_F(SharedSharedTokenBucketImplTest, NextTokenAvailable) { } // Test partial consumption of tokens. -TEST_F(SharedSharedTokenBucketImplTest, PartialConsumption) { +TEST_F(SharedTokenBucketImplTest, PartialConsumption) { SharedTokenBucketImpl token_bucket{16, time_system_, 16}; EXPECT_EQ(16, token_bucket.consume(18, true)); EXPECT_EQ(std::chrono::milliseconds(63), token_bucket.nextTokenAvailable()); @@ -86,7 +86,7 @@ TEST_F(SharedSharedTokenBucketImplTest, PartialConsumption) { } // Test reset functionality for a shared token bucket. -TEST_F(SharedSharedTokenBucketImplTest, Reset) { +TEST_F(SharedTokenBucketImplTest, Reset) { SharedTokenBucketImpl token_bucket{16, time_system_, 16}; token_bucket.synchronizer().enable(); // Start a thread and call consume. This will wait post checking reset_once flag. @@ -112,7 +112,7 @@ TEST_F(SharedSharedTokenBucketImplTest, Reset) { } // Verifies that TokenBucket can consume tokens with thread safety. -TEST_F(SharedSharedTokenBucketImplTest, SynchronizedConsume) { +TEST_F(SharedTokenBucketImplTest, SynchronizedConsume) { SharedTokenBucketImpl token_bucket{10, time_system_, 1}; token_bucket.synchronizer().enable(); @@ -130,7 +130,7 @@ TEST_F(SharedSharedTokenBucketImplTest, SynchronizedConsume) { EXPECT_FALSE(token_bucket.isMutexLocked()); } -TEST_F(SharedSharedTokenBucketImplTest, SynchronizedNextTokenAvailable) { +TEST_F(SharedTokenBucketImplTest, SynchronizedNextTokenAvailable) { SharedTokenBucketImpl token_bucket{10, time_system_, 16}; token_bucket.synchronizer().enable(); From d29d47a4801ffdf5f81c3f332d1e1d36ce0d5ade Mon Sep 17 00:00:00 2001 From: Nitin Date: Wed, 27 Jan 2021 16:46:03 -0800 Subject: [PATCH 07/13] address comments Signed-off-by: Nitin --- .../common/common/shared_token_bucket_impl.cc | 22 +++------ .../common/common/shared_token_bucket_impl.h | 22 +++------ .../common/shared_token_bucket_impl_test.cc | 48 ++++++++++++------- 3 files changed, 43 insertions(+), 49 deletions(-) diff --git a/source/common/common/shared_token_bucket_impl.cc b/source/common/common/shared_token_bucket_impl.cc index 8e711ea8766a2..7bdddbd69d692 100644 --- a/source/common/common/shared_token_bucket_impl.cc +++ b/source/common/common/shared_token_bucket_impl.cc @@ -13,21 +13,19 @@ SharedTokenBucketImpl::SharedTokenBucketImpl(uint64_t max_tokens, TimeSource& ti double fill_rate) : impl_(max_tokens, time_source, fill_rate), reset_once_(false) {} -uint64_t SharedTokenBucketImpl::consume(uint64_t tokens, bool allow_partial) -/*ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_)*/ { +uint64_t SharedTokenBucketImpl::consume(uint64_t tokens, bool allow_partial) { Thread::LockGuard lock(mutex_); synchronizer_.syncPoint(GetImplSyncPoint); - return getImpl().consume(tokens, allow_partial); + return impl_.consume(tokens, allow_partial); }; -std::chrono::milliseconds SharedTokenBucketImpl::nextTokenAvailable() -/*ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_)*/ { +std::chrono::milliseconds SharedTokenBucketImpl::nextTokenAvailable() { Thread::LockGuard lock(mutex_); synchronizer_.syncPoint(GetImplSyncPoint); - return getImpl().nextTokenAvailable(); + return impl_.nextTokenAvailable(); }; -void SharedTokenBucketImpl::reset(uint64_t num_tokens) /*ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_)*/ { +void SharedTokenBucketImpl::reset(uint64_t num_tokens) { Thread::LockGuard lock(mutex_); // Don't reset if reset once before. if (reset_once_) { @@ -35,15 +33,7 @@ void SharedTokenBucketImpl::reset(uint64_t num_tokens) /*ABSL_EXCLUSIVE_LOCKS_RE } reset_once_ = true; synchronizer_.syncPoint(ResetCheckSyncPoint); - getImpl().reset(num_tokens); + impl_.reset(num_tokens); }; -bool SharedTokenBucketImpl::isMutexLocked() { - auto locked = mutex_.tryLock(); - if (locked) { - mutex_.unlock(); - } - return !locked; -} - } // namespace Envoy diff --git a/source/common/common/shared_token_bucket_impl.h b/source/common/common/shared_token_bucket_impl.h index 21cc51c06b597..bca249f2ad844 100644 --- a/source/common/common/shared_token_bucket_impl.h +++ b/source/common/common/shared_token_bucket_impl.h @@ -28,31 +28,23 @@ class SharedTokenBucketImpl : public TokenBucket { SharedTokenBucketImpl(SharedTokenBucketImpl&&) = delete; // TokenBucket - uint64_t consume(uint64_t tokens, bool allow_partial) - /*ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_)*/ override; - std::chrono::milliseconds nextTokenAvailable() /*ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_)*/ override; + uint64_t consume(uint64_t tokens, bool allow_partial) override; + + std::chrono::milliseconds nextTokenAvailable() override; /** - * Resets the bucket to contain tokens equal to @param num_tokens - * Since the token bucket is shared, only the first reset call will work. Subsequent calls to - * reset method will be ignored. + * Since the token bucket is shared, only the first reset call will work. + * Subsequent calls to reset method will be ignored. */ - void reset(uint64_t num_tokens) /*ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_)*/ override; - - // Used only for testing. - Thread::ThreadSynchronizer& synchronizer() { return synchronizer_; }; - - // Returns a flag to indicate whether mutex was in lock state. - bool isMutexLocked() ABSL_EXCLUSIVE_TRYLOCK_FUNCTION(false, mutex_); + void reset(uint64_t num_tokens) override; private: - TokenBucketImpl& getImpl() { return impl_; } - Thread::MutexBasicLockable mutex_; TokenBucketImpl impl_ ABSL_GUARDED_BY(mutex_); bool reset_once_ ABSL_GUARDED_BY(mutex_); mutable Thread::ThreadSynchronizer synchronizer_; // Used only for testing. + friend class SharedTokenBucketImplTest; }; } // namespace Envoy diff --git a/test/common/common/shared_token_bucket_impl_test.cc b/test/common/common/shared_token_bucket_impl_test.cc index 42f0c3b216350..b73e5001abbef 100644 --- a/test/common/common/shared_token_bucket_impl_test.cc +++ b/test/common/common/shared_token_bucket_impl_test.cc @@ -10,6 +10,18 @@ namespace Envoy { class SharedTokenBucketImplTest : public testing::Test { protected: + bool isMutexLocked(SharedTokenBucketImpl& token) { + auto locked = token.mutex_.tryLock(); + if (locked) { + token.mutex_.unlock(); + } + return !locked; + } + + Thread::ThreadSynchronizer& synchronizer(SharedTokenBucketImpl& token) { + return token.synchronizer_; + }; + Event::SimulatedTimeSystem time_system_; }; @@ -88,20 +100,20 @@ TEST_F(SharedTokenBucketImplTest, PartialConsumption) { // Test reset functionality for a shared token bucket. TEST_F(SharedTokenBucketImplTest, Reset) { SharedTokenBucketImpl token_bucket{16, time_system_, 16}; - token_bucket.synchronizer().enable(); + synchronizer(token_bucket).enable(); // Start a thread and call consume. This will wait post checking reset_once flag. - token_bucket.synchronizer().waitOn(SharedTokenBucketImpl::ResetCheckSyncPoint); + synchronizer(token_bucket).waitOn(SharedTokenBucketImpl::ResetCheckSyncPoint); std::thread thread([&] { token_bucket.reset(1); }); // Wait until the thread is actually waiting. - token_bucket.synchronizer().barrierOn(SharedTokenBucketImpl::ResetCheckSyncPoint); + synchronizer(token_bucket).barrierOn(SharedTokenBucketImpl::ResetCheckSyncPoint); // Mutex should be already locked. - EXPECT_TRUE(token_bucket.isMutexLocked()); - token_bucket.synchronizer().signal(SharedTokenBucketImpl::ResetCheckSyncPoint); + EXPECT_TRUE(isMutexLocked(token_bucket)); + synchronizer(token_bucket).signal(SharedTokenBucketImpl::ResetCheckSyncPoint); thread.join(); - EXPECT_FALSE(token_bucket.isMutexLocked()); + EXPECT_FALSE(isMutexLocked(token_bucket)); EXPECT_EQ(1, token_bucket.consume(2, true)); EXPECT_EQ(std::chrono::milliseconds(63), token_bucket.nextTokenAvailable()); @@ -115,38 +127,38 @@ TEST_F(SharedTokenBucketImplTest, Reset) { TEST_F(SharedTokenBucketImplTest, SynchronizedConsume) { SharedTokenBucketImpl token_bucket{10, time_system_, 1}; - token_bucket.synchronizer().enable(); + synchronizer(token_bucket).enable(); // Start a thread and call consume. This will wait post lock. - token_bucket.synchronizer().waitOn(SharedTokenBucketImpl::GetImplSyncPoint); + synchronizer(token_bucket).waitOn(SharedTokenBucketImpl::GetImplSyncPoint); std::thread thread([&] { EXPECT_EQ(10, token_bucket.consume(20, true)); }); // Wait until the thread is actually waiting. - token_bucket.synchronizer().barrierOn(SharedTokenBucketImpl::GetImplSyncPoint); + synchronizer(token_bucket).barrierOn(SharedTokenBucketImpl::GetImplSyncPoint); // Mutex should be already locked. - EXPECT_TRUE(token_bucket.isMutexLocked()); - token_bucket.synchronizer().signal(SharedTokenBucketImpl::GetImplSyncPoint); + EXPECT_TRUE(isMutexLocked(token_bucket)); + synchronizer(token_bucket).signal(SharedTokenBucketImpl::GetImplSyncPoint); thread.join(); - EXPECT_FALSE(token_bucket.isMutexLocked()); + EXPECT_FALSE(isMutexLocked(token_bucket)); } TEST_F(SharedTokenBucketImplTest, SynchronizedNextTokenAvailable) { SharedTokenBucketImpl token_bucket{10, time_system_, 16}; - token_bucket.synchronizer().enable(); + synchronizer(token_bucket).enable(); // Start a thread and call consume. This will wait post lock. - token_bucket.synchronizer().waitOn(SharedTokenBucketImpl::GetImplSyncPoint); + synchronizer(token_bucket).waitOn(SharedTokenBucketImpl::GetImplSyncPoint); std::thread thread( [&] { EXPECT_EQ(std::chrono::milliseconds(0), token_bucket.nextTokenAvailable()); }); // Wait until the thread is actually waiting. - token_bucket.synchronizer().barrierOn(SharedTokenBucketImpl::GetImplSyncPoint); + synchronizer(token_bucket).barrierOn(SharedTokenBucketImpl::GetImplSyncPoint); // Mutex should be already locked. - EXPECT_TRUE(token_bucket.isMutexLocked()); - token_bucket.synchronizer().signal(SharedTokenBucketImpl::GetImplSyncPoint); + EXPECT_TRUE(isMutexLocked(token_bucket)); + synchronizer(token_bucket).signal(SharedTokenBucketImpl::GetImplSyncPoint); thread.join(); - EXPECT_FALSE(token_bucket.isMutexLocked()); + EXPECT_FALSE(isMutexLocked(token_bucket)); } } // namespace Envoy From 383fc7354192d626d687e266080f8cc33dd29b48 Mon Sep 17 00:00:00 2001 From: Nitin Date: Thu, 4 Feb 2021 13:54:30 -0800 Subject: [PATCH 08/13] add combined consume and next token method Signed-off-by: Nitin --- include/envoy/common/token_bucket.h | 12 +++++++ .../common/common/shared_token_bucket_impl.cc | 7 ++++ .../common/common/shared_token_bucket_impl.h | 3 +- source/common/common/token_bucket_impl.cc | 7 ++++ source/common/common/token_bucket_impl.h | 1 + .../common/shared_token_bucket_impl_test.cc | 34 +++++++++++++++++++ test/common/common/token_bucket_impl_test.cc | 17 ++++++++++ 7 files changed, 80 insertions(+), 1 deletion(-) diff --git a/include/envoy/common/token_bucket.h b/include/envoy/common/token_bucket.h index f3db6a884154f..bd362ad6f66db 100644 --- a/include/envoy/common/token_bucket.h +++ b/include/envoy/common/token_bucket.h @@ -27,6 +27,18 @@ class TokenBucket { */ virtual uint64_t consume(uint64_t tokens, bool allow_partial) PURE; + /** + * @param tokens supplies the number of tokens to be consumed. + * @param allow_partial supplies whether the token bucket will allow consumption of less tokens + * than asked for. If allow_partial is true, the bucket contains 3 tokens, + * and the caller asks for 5, the bucket will return 3 tokens and now be + * empty. + * @param timeToNextToken out param indicating the approx time until next token is available. + * @return the number of tokens actually consumed. + */ + virtual uint64_t consume(uint64_t tokens, bool allow_partial, + std::chrono::milliseconds& timeToNextToken) PURE; + /** * @return returns the approximate time until a next token is available. Currently it * returns the upper bound on the amount of time until a next token is available. diff --git a/source/common/common/shared_token_bucket_impl.cc b/source/common/common/shared_token_bucket_impl.cc index 7bdddbd69d692..ef3315d335638 100644 --- a/source/common/common/shared_token_bucket_impl.cc +++ b/source/common/common/shared_token_bucket_impl.cc @@ -19,6 +19,13 @@ uint64_t SharedTokenBucketImpl::consume(uint64_t tokens, bool allow_partial) { return impl_.consume(tokens, allow_partial); }; +uint64_t SharedTokenBucketImpl::consume(uint64_t tokens, bool allow_partial, + std::chrono::milliseconds& timeToNextToken) { + Thread::LockGuard lock(mutex_); + synchronizer_.syncPoint(GetImplSyncPoint); + return impl_.consume(tokens, allow_partial, timeToNextToken); +}; + std::chrono::milliseconds SharedTokenBucketImpl::nextTokenAvailable() { Thread::LockGuard lock(mutex_); synchronizer_.syncPoint(GetImplSyncPoint); diff --git a/source/common/common/shared_token_bucket_impl.h b/source/common/common/shared_token_bucket_impl.h index bca249f2ad844..6e33de3de9854 100644 --- a/source/common/common/shared_token_bucket_impl.h +++ b/source/common/common/shared_token_bucket_impl.h @@ -30,7 +30,8 @@ class SharedTokenBucketImpl : public TokenBucket { // TokenBucket uint64_t consume(uint64_t tokens, bool allow_partial) override; - + uint64_t consume(uint64_t tokens, bool allow_partial, + std::chrono::milliseconds& timeToNextToken) override; std::chrono::milliseconds nextTokenAvailable() override; /** diff --git a/source/common/common/token_bucket_impl.cc b/source/common/common/token_bucket_impl.cc index 5e7de9e6bb1a7..398c2c7d0c1f5 100644 --- a/source/common/common/token_bucket_impl.cc +++ b/source/common/common/token_bucket_impl.cc @@ -29,6 +29,13 @@ uint64_t TokenBucketImpl::consume(uint64_t tokens, bool allow_partial) { return tokens; } +uint64_t TokenBucketImpl::consume(uint64_t tokens, bool allow_partial, + std::chrono::milliseconds& timeToNextToken) { + auto tokens_consumed = consume(tokens, allow_partial); + timeToNextToken = nextTokenAvailable(); + return tokens_consumed; +} + std::chrono::milliseconds TokenBucketImpl::nextTokenAvailable() { // If there are tokens available, return immediately. if (tokens_ >= 1) { diff --git a/source/common/common/token_bucket_impl.h b/source/common/common/token_bucket_impl.h index 644a4185dd5ab..fef2857ba5674 100644 --- a/source/common/common/token_bucket_impl.h +++ b/source/common/common/token_bucket_impl.h @@ -22,6 +22,7 @@ class TokenBucketImpl : public TokenBucket { // TokenBucket uint64_t consume(uint64_t tokens, bool allow_partial) override; + uint64_t consume(uint64_t tokens, bool allow_partial, std::chrono::milliseconds& timeToNextToken); std::chrono::milliseconds nextTokenAvailable() override; void reset(uint64_t num_tokens) override; diff --git a/test/common/common/shared_token_bucket_impl_test.cc b/test/common/common/shared_token_bucket_impl_test.cc index b73e5001abbef..cffe45ae81b68 100644 --- a/test/common/common/shared_token_bucket_impl_test.cc +++ b/test/common/common/shared_token_bucket_impl_test.cc @@ -161,4 +161,38 @@ TEST_F(SharedTokenBucketImplTest, SynchronizedNextTokenAvailable) { EXPECT_FALSE(isMutexLocked(token_bucket)); } +// Verifies that TokenBucket can consume tokens with thread safety. +TEST_F(SharedTokenBucketImplTest, SynchronizedConsumeAndNextToken) { + SharedTokenBucketImpl token_bucket{10, time_system_, 5}; + + // Exhaust all tokens. + EXPECT_EQ(10, token_bucket.consume(20, true)); + EXPECT_EQ(std::chrono::milliseconds(200), token_bucket.nextTokenAvailable()); + time_system_.advanceTimeWait(std::chrono::milliseconds(400)); + + // Start a thread and call consume to refill tokens. + std::thread t1([&] { EXPECT_EQ(1, token_bucket.consume(1, false)); }); + + t1.join(); + + EXPECT_EQ(std::chrono::milliseconds(0), token_bucket.nextTokenAvailable()); + + token_bucket.reset(10); + // Exhaust all tokens. + std::chrono::milliseconds timeToNextToken(0); + EXPECT_EQ(10, token_bucket.consume(20, true, timeToNextToken)); + EXPECT_EQ(timeToNextToken.count(), 200); + time_system_.advanceTimeWait(std::chrono::milliseconds(400)); + + // Start a thread and call consume to refill tokens. + std::thread t2([&] { + EXPECT_EQ(1, token_bucket.consume(1, false, timeToNextToken)); + EXPECT_EQ(timeToNextToken.count(), 0); + }); + + t2.join(); + + EXPECT_EQ(std::chrono::milliseconds(0), token_bucket.nextTokenAvailable()); +} + } // namespace Envoy diff --git a/test/common/common/token_bucket_impl_test.cc b/test/common/common/token_bucket_impl_test.cc index 8e2b6b2b27a59..b8ba67185c5d2 100644 --- a/test/common/common/token_bucket_impl_test.cc +++ b/test/common/common/token_bucket_impl_test.cc @@ -93,4 +93,21 @@ TEST_F(TokenBucketImplTest, Reset) { EXPECT_EQ(std::chrono::milliseconds(63), token_bucket.nextTokenAvailable()); } +// Verifies that TokenBucket can consume tokens and return next token time. +TEST_F(TokenBucketImplTest, ConsumeAndNextToken) { + TokenBucketImpl token_bucket{10, time_system_, 5}; + + // Exhaust all tokens. + std::chrono::milliseconds timeToNextToken(0); + EXPECT_EQ(10, token_bucket.consume(20, true, timeToNextToken)); + EXPECT_EQ(timeToNextToken.count(), 200); + EXPECT_EQ(std::chrono::milliseconds(200), token_bucket.nextTokenAvailable()); + time_system_.advanceTimeWait(std::chrono::milliseconds(400)); + + // Start a thread and call consume to refill tokens. + EXPECT_EQ(1, token_bucket.consume(1, false, timeToNextToken)); + EXPECT_EQ(timeToNextToken.count(), 0); + EXPECT_EQ(std::chrono::milliseconds(0), token_bucket.nextTokenAvailable()); +} + } // namespace Envoy From 5b0cb4c2b68956b329f670a0ca19bfbf936bc3d8 Mon Sep 17 00:00:00 2001 From: Nitin Date: Thu, 4 Feb 2021 14:16:30 -0800 Subject: [PATCH 09/13] build fix Signed-off-by: Nitin --- source/common/common/token_bucket_impl.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/common/common/token_bucket_impl.h b/source/common/common/token_bucket_impl.h index fef2857ba5674..0b73c36611e81 100644 --- a/source/common/common/token_bucket_impl.h +++ b/source/common/common/token_bucket_impl.h @@ -22,7 +22,8 @@ class TokenBucketImpl : public TokenBucket { // TokenBucket uint64_t consume(uint64_t tokens, bool allow_partial) override; - uint64_t consume(uint64_t tokens, bool allow_partial, std::chrono::milliseconds& timeToNextToken); + uint64_t consume(uint64_t tokens, bool allow_partial, + std::chrono::milliseconds& timeToNextToken) override; std::chrono::milliseconds nextTokenAvailable() override; void reset(uint64_t num_tokens) override; From 4d7b7d157f3036626c086744e093b9b855577ad9 Mon Sep 17 00:00:00 2001 From: Nitin Date: Thu, 4 Feb 2021 15:57:00 -0800 Subject: [PATCH 10/13] clang tidy fix Signed-off-by: Nitin --- include/envoy/common/token_bucket.h | 4 ++-- source/common/common/shared_token_bucket_impl.cc | 4 ++-- source/common/common/shared_token_bucket_impl.h | 2 +- source/common/common/token_bucket_impl.cc | 4 ++-- source/common/common/token_bucket_impl.h | 2 +- test/common/common/shared_token_bucket_impl_test.cc | 10 +++++----- test/common/common/token_bucket_impl_test.cc | 10 +++++----- 7 files changed, 18 insertions(+), 18 deletions(-) diff --git a/include/envoy/common/token_bucket.h b/include/envoy/common/token_bucket.h index bd362ad6f66db..834b99a9dae73 100644 --- a/include/envoy/common/token_bucket.h +++ b/include/envoy/common/token_bucket.h @@ -33,11 +33,11 @@ class TokenBucket { * than asked for. If allow_partial is true, the bucket contains 3 tokens, * and the caller asks for 5, the bucket will return 3 tokens and now be * empty. - * @param timeToNextToken out param indicating the approx time until next token is available. + * @param time_to_next_token out param indicating the approx time until next token is available. * @return the number of tokens actually consumed. */ virtual uint64_t consume(uint64_t tokens, bool allow_partial, - std::chrono::milliseconds& timeToNextToken) PURE; + std::chrono::milliseconds& time_to_next_token) PURE; /** * @return returns the approximate time until a next token is available. Currently it diff --git a/source/common/common/shared_token_bucket_impl.cc b/source/common/common/shared_token_bucket_impl.cc index ef3315d335638..ebec0647589e1 100644 --- a/source/common/common/shared_token_bucket_impl.cc +++ b/source/common/common/shared_token_bucket_impl.cc @@ -20,10 +20,10 @@ uint64_t SharedTokenBucketImpl::consume(uint64_t tokens, bool allow_partial) { }; uint64_t SharedTokenBucketImpl::consume(uint64_t tokens, bool allow_partial, - std::chrono::milliseconds& timeToNextToken) { + std::chrono::milliseconds& time_to_next_token) { Thread::LockGuard lock(mutex_); synchronizer_.syncPoint(GetImplSyncPoint); - return impl_.consume(tokens, allow_partial, timeToNextToken); + return impl_.consume(tokens, allow_partial, time_to_next_token); }; std::chrono::milliseconds SharedTokenBucketImpl::nextTokenAvailable() { diff --git a/source/common/common/shared_token_bucket_impl.h b/source/common/common/shared_token_bucket_impl.h index 6e33de3de9854..9df0397dcd606 100644 --- a/source/common/common/shared_token_bucket_impl.h +++ b/source/common/common/shared_token_bucket_impl.h @@ -31,7 +31,7 @@ class SharedTokenBucketImpl : public TokenBucket { uint64_t consume(uint64_t tokens, bool allow_partial) override; uint64_t consume(uint64_t tokens, bool allow_partial, - std::chrono::milliseconds& timeToNextToken) override; + std::chrono::milliseconds& time_to_next_token) override; std::chrono::milliseconds nextTokenAvailable() override; /** diff --git a/source/common/common/token_bucket_impl.cc b/source/common/common/token_bucket_impl.cc index 398c2c7d0c1f5..c038306c5403c 100644 --- a/source/common/common/token_bucket_impl.cc +++ b/source/common/common/token_bucket_impl.cc @@ -30,9 +30,9 @@ uint64_t TokenBucketImpl::consume(uint64_t tokens, bool allow_partial) { } uint64_t TokenBucketImpl::consume(uint64_t tokens, bool allow_partial, - std::chrono::milliseconds& timeToNextToken) { + std::chrono::milliseconds& time_to_next_token) { auto tokens_consumed = consume(tokens, allow_partial); - timeToNextToken = nextTokenAvailable(); + time_to_next_token = nextTokenAvailable(); return tokens_consumed; } diff --git a/source/common/common/token_bucket_impl.h b/source/common/common/token_bucket_impl.h index 0b73c36611e81..56315549bc58c 100644 --- a/source/common/common/token_bucket_impl.h +++ b/source/common/common/token_bucket_impl.h @@ -23,7 +23,7 @@ class TokenBucketImpl : public TokenBucket { // TokenBucket uint64_t consume(uint64_t tokens, bool allow_partial) override; uint64_t consume(uint64_t tokens, bool allow_partial, - std::chrono::milliseconds& timeToNextToken) override; + std::chrono::milliseconds& time_to_next_token) override; std::chrono::milliseconds nextTokenAvailable() override; void reset(uint64_t num_tokens) override; diff --git a/test/common/common/shared_token_bucket_impl_test.cc b/test/common/common/shared_token_bucket_impl_test.cc index cffe45ae81b68..6b7abc541aeb2 100644 --- a/test/common/common/shared_token_bucket_impl_test.cc +++ b/test/common/common/shared_token_bucket_impl_test.cc @@ -179,15 +179,15 @@ TEST_F(SharedTokenBucketImplTest, SynchronizedConsumeAndNextToken) { token_bucket.reset(10); // Exhaust all tokens. - std::chrono::milliseconds timeToNextToken(0); - EXPECT_EQ(10, token_bucket.consume(20, true, timeToNextToken)); - EXPECT_EQ(timeToNextToken.count(), 200); + std::chrono::milliseconds time_to_next_token(0); + EXPECT_EQ(10, token_bucket.consume(20, true, time_to_next_token)); + EXPECT_EQ(time_to_next_token.count(), 200); time_system_.advanceTimeWait(std::chrono::milliseconds(400)); // Start a thread and call consume to refill tokens. std::thread t2([&] { - EXPECT_EQ(1, token_bucket.consume(1, false, timeToNextToken)); - EXPECT_EQ(timeToNextToken.count(), 0); + EXPECT_EQ(1, token_bucket.consume(1, false, time_to_next_token)); + EXPECT_EQ(time_to_next_token.count(), 0); }); t2.join(); diff --git a/test/common/common/token_bucket_impl_test.cc b/test/common/common/token_bucket_impl_test.cc index b8ba67185c5d2..e1d560488aebc 100644 --- a/test/common/common/token_bucket_impl_test.cc +++ b/test/common/common/token_bucket_impl_test.cc @@ -98,15 +98,15 @@ TEST_F(TokenBucketImplTest, ConsumeAndNextToken) { TokenBucketImpl token_bucket{10, time_system_, 5}; // Exhaust all tokens. - std::chrono::milliseconds timeToNextToken(0); - EXPECT_EQ(10, token_bucket.consume(20, true, timeToNextToken)); - EXPECT_EQ(timeToNextToken.count(), 200); + std::chrono::milliseconds time_to_next_token(0); + EXPECT_EQ(10, token_bucket.consume(20, true, time_to_next_token)); + EXPECT_EQ(time_to_next_token.count(), 200); EXPECT_EQ(std::chrono::milliseconds(200), token_bucket.nextTokenAvailable()); time_system_.advanceTimeWait(std::chrono::milliseconds(400)); // Start a thread and call consume to refill tokens. - EXPECT_EQ(1, token_bucket.consume(1, false, timeToNextToken)); - EXPECT_EQ(timeToNextToken.count(), 0); + EXPECT_EQ(1, token_bucket.consume(1, false, time_to_next_token)); + EXPECT_EQ(time_to_next_token.count(), 0); EXPECT_EQ(std::chrono::milliseconds(0), token_bucket.nextTokenAvailable()); } From 1f4e32625c04ac7bb40484472d43d72936bfa181 Mon Sep 17 00:00:00 2001 From: Nitin Date: Fri, 5 Feb 2021 13:26:06 -0800 Subject: [PATCH 11/13] test new interface by default Signed-off-by: Nitin --- .../common/shared_token_bucket_impl_test.cc | 55 ++++++++++--------- test/common/common/token_bucket_impl_test.cc | 4 +- 2 files changed, 30 insertions(+), 29 deletions(-) diff --git a/test/common/common/shared_token_bucket_impl_test.cc b/test/common/common/shared_token_bucket_impl_test.cc index 6b7abc541aeb2..e55bd7a446de5 100644 --- a/test/common/common/shared_token_bucket_impl_test.cc +++ b/test/common/common/shared_token_bucket_impl_test.cc @@ -23,77 +23,78 @@ class SharedTokenBucketImplTest : public testing::Test { }; Event::SimulatedTimeSystem time_system_; + std::chrono::milliseconds time_to_next_token; }; // Verifies TokenBucket initialization. TEST_F(SharedTokenBucketImplTest, Initialization) { SharedTokenBucketImpl token_bucket{1, time_system_, -1.0}; - EXPECT_EQ(1, token_bucket.consume(1, false)); - EXPECT_EQ(0, token_bucket.consume(1, false)); + EXPECT_EQ(1, token_bucket.consume(1, false, time_to_next_token)); + EXPECT_EQ(0, token_bucket.consume(1, false, time_to_next_token)); } // Verifies TokenBucket's maximum capacity. TEST_F(SharedTokenBucketImplTest, MaxBucketSize) { SharedTokenBucketImpl token_bucket{3, time_system_, 1}; - EXPECT_EQ(3, token_bucket.consume(3, false)); + EXPECT_EQ(3, token_bucket.consume(3, false, time_to_next_token)); time_system_.setMonotonicTime(std::chrono::seconds(10)); - EXPECT_EQ(0, token_bucket.consume(4, false)); - EXPECT_EQ(3, token_bucket.consume(3, false)); + EXPECT_EQ(0, token_bucket.consume(4, false, time_to_next_token)); + EXPECT_EQ(3, token_bucket.consume(3, false, time_to_next_token)); } // Verifies that TokenBucket can consume tokens. TEST_F(SharedTokenBucketImplTest, Consume) { SharedTokenBucketImpl token_bucket{10, time_system_, 1}; - EXPECT_EQ(0, token_bucket.consume(20, false)); - EXPECT_EQ(9, token_bucket.consume(9, false)); + EXPECT_EQ(0, token_bucket.consume(20, false, time_to_next_token)); + EXPECT_EQ(9, token_bucket.consume(9, false, time_to_next_token)); - EXPECT_EQ(1, token_bucket.consume(1, false)); + EXPECT_EQ(1, token_bucket.consume(1, false, time_to_next_token)); time_system_.setMonotonicTime(std::chrono::milliseconds(999)); - EXPECT_EQ(0, token_bucket.consume(1, false)); + EXPECT_EQ(0, token_bucket.consume(1, false, time_to_next_token)); time_system_.setMonotonicTime(std::chrono::milliseconds(5999)); - EXPECT_EQ(0, token_bucket.consume(6, false)); + EXPECT_EQ(0, token_bucket.consume(6, false, time_to_next_token)); time_system_.setMonotonicTime(std::chrono::milliseconds(6000)); - EXPECT_EQ(6, token_bucket.consume(6, false)); - EXPECT_EQ(0, token_bucket.consume(1, false)); + EXPECT_EQ(6, token_bucket.consume(6, false, time_to_next_token)); + EXPECT_EQ(0, token_bucket.consume(1, false, time_to_next_token)); } // Verifies that TokenBucket can refill tokens. TEST_F(SharedTokenBucketImplTest, Refill) { SharedTokenBucketImpl token_bucket{1, time_system_, 0.5}; - EXPECT_EQ(1, token_bucket.consume(1, false)); + EXPECT_EQ(1, token_bucket.consume(1, false, time_to_next_token)); time_system_.setMonotonicTime(std::chrono::milliseconds(500)); - EXPECT_EQ(0, token_bucket.consume(1, false)); + EXPECT_EQ(0, token_bucket.consume(1, false, time_to_next_token)); time_system_.setMonotonicTime(std::chrono::milliseconds(1500)); - EXPECT_EQ(0, token_bucket.consume(1, false)); + EXPECT_EQ(0, token_bucket.consume(1, false, time_to_next_token)); time_system_.setMonotonicTime(std::chrono::milliseconds(2000)); - EXPECT_EQ(1, token_bucket.consume(1, false)); + EXPECT_EQ(1, token_bucket.consume(1, false, time_to_next_token)); } TEST_F(SharedTokenBucketImplTest, NextTokenAvailable) { SharedTokenBucketImpl token_bucket{10, time_system_, 5}; - EXPECT_EQ(9, token_bucket.consume(9, false)); + EXPECT_EQ(9, token_bucket.consume(9, false, time_to_next_token)); EXPECT_EQ(std::chrono::milliseconds(0), token_bucket.nextTokenAvailable()); - EXPECT_EQ(1, token_bucket.consume(1, false)); - EXPECT_EQ(0, token_bucket.consume(1, false)); + EXPECT_EQ(1, token_bucket.consume(1, false, time_to_next_token)); + EXPECT_EQ(0, token_bucket.consume(1, false, time_to_next_token)); EXPECT_EQ(std::chrono::milliseconds(200), token_bucket.nextTokenAvailable()); } // Test partial consumption of tokens. TEST_F(SharedTokenBucketImplTest, PartialConsumption) { SharedTokenBucketImpl token_bucket{16, time_system_, 16}; - EXPECT_EQ(16, token_bucket.consume(18, true)); + EXPECT_EQ(16, token_bucket.consume(18, true, time_to_next_token)); EXPECT_EQ(std::chrono::milliseconds(63), token_bucket.nextTokenAvailable()); time_system_.advanceTimeWait(std::chrono::milliseconds(62)); - EXPECT_EQ(0, token_bucket.consume(1, true)); + EXPECT_EQ(0, token_bucket.consume(1, true, time_to_next_token)); time_system_.advanceTimeWait(std::chrono::milliseconds(1)); - EXPECT_EQ(1, token_bucket.consume(2, true)); + EXPECT_EQ(1, token_bucket.consume(2, true, time_to_next_token)); EXPECT_EQ(std::chrono::milliseconds(63), token_bucket.nextTokenAvailable()); } @@ -115,12 +116,12 @@ TEST_F(SharedTokenBucketImplTest, Reset) { thread.join(); EXPECT_FALSE(isMutexLocked(token_bucket)); - EXPECT_EQ(1, token_bucket.consume(2, true)); + EXPECT_EQ(1, token_bucket.consume(2, true, time_to_next_token)); EXPECT_EQ(std::chrono::milliseconds(63), token_bucket.nextTokenAvailable()); // Reset again. Should be ignored for shared bucket. token_bucket.reset(5); - EXPECT_EQ(0, token_bucket.consume(5, true)); + EXPECT_EQ(0, token_bucket.consume(5, true, time_to_next_token)); } // Verifies that TokenBucket can consume tokens with thread safety. @@ -130,7 +131,7 @@ TEST_F(SharedTokenBucketImplTest, SynchronizedConsume) { synchronizer(token_bucket).enable(); // Start a thread and call consume. This will wait post lock. synchronizer(token_bucket).waitOn(SharedTokenBucketImpl::GetImplSyncPoint); - std::thread thread([&] { EXPECT_EQ(10, token_bucket.consume(20, true)); }); + std::thread thread([&] { EXPECT_EQ(10, token_bucket.consume(20, true, time_to_next_token)); }); // Wait until the thread is actually waiting. synchronizer(token_bucket).barrierOn(SharedTokenBucketImpl::GetImplSyncPoint); @@ -166,12 +167,12 @@ TEST_F(SharedTokenBucketImplTest, SynchronizedConsumeAndNextToken) { SharedTokenBucketImpl token_bucket{10, time_system_, 5}; // Exhaust all tokens. - EXPECT_EQ(10, token_bucket.consume(20, true)); + EXPECT_EQ(10, token_bucket.consume(20, true, time_to_next_token)); EXPECT_EQ(std::chrono::milliseconds(200), token_bucket.nextTokenAvailable()); time_system_.advanceTimeWait(std::chrono::milliseconds(400)); // Start a thread and call consume to refill tokens. - std::thread t1([&] { EXPECT_EQ(1, token_bucket.consume(1, false)); }); + std::thread t1([&] { EXPECT_EQ(1, token_bucket.consume(1, false, time_to_next_token)); }); t1.join(); diff --git a/test/common/common/token_bucket_impl_test.cc b/test/common/common/token_bucket_impl_test.cc index e1d560488aebc..3bc61bcb53753 100644 --- a/test/common/common/token_bucket_impl_test.cc +++ b/test/common/common/token_bucket_impl_test.cc @@ -101,13 +101,13 @@ TEST_F(TokenBucketImplTest, ConsumeAndNextToken) { std::chrono::milliseconds time_to_next_token(0); EXPECT_EQ(10, token_bucket.consume(20, true, time_to_next_token)); EXPECT_EQ(time_to_next_token.count(), 200); - EXPECT_EQ(std::chrono::milliseconds(200), token_bucket.nextTokenAvailable()); + EXPECT_EQ(time_to_next_token, token_bucket.nextTokenAvailable()); time_system_.advanceTimeWait(std::chrono::milliseconds(400)); // Start a thread and call consume to refill tokens. EXPECT_EQ(1, token_bucket.consume(1, false, time_to_next_token)); EXPECT_EQ(time_to_next_token.count(), 0); - EXPECT_EQ(std::chrono::milliseconds(0), token_bucket.nextTokenAvailable()); + EXPECT_EQ(time_to_next_token, token_bucket.nextTokenAvailable()); } } // namespace Envoy From 14146925a27075dd4854b5f96eb6143cb508799f Mon Sep 17 00:00:00 2001 From: Nitin Goyal Date: Wed, 10 Feb 2021 19:06:56 -0800 Subject: [PATCH 12/13] rename reset method Signed-off-by: Nitin Goyal --- include/envoy/common/token_bucket.h | 4 +++- source/common/common/shared_token_bucket_impl.cc | 4 ++-- source/common/common/shared_token_bucket_impl.h | 2 +- source/common/common/token_bucket_impl.cc | 2 +- source/common/common/token_bucket_impl.h | 2 +- source/extensions/filters/http/fault/fault_filter.cc | 2 +- test/common/common/shared_token_bucket_impl_test.cc | 6 +++--- test/common/common/token_bucket_impl_test.cc | 2 +- 8 files changed, 13 insertions(+), 11 deletions(-) diff --git a/include/envoy/common/token_bucket.h b/include/envoy/common/token_bucket.h index 834b99a9dae73..396fbcfcae014 100644 --- a/include/envoy/common/token_bucket.h +++ b/include/envoy/common/token_bucket.h @@ -48,8 +48,10 @@ class TokenBucket { /** * Reset the bucket with a specific number of tokens. Refill will begin again from the time that * this routine is called. + * Note: The reset call might not be honored only the first time this method is called. Check the + * concrete implementation to confirm. */ - virtual void reset(uint64_t num_tokens) PURE; + virtual void maybeReset(uint64_t num_tokens) PURE; }; using TokenBucketPtr = std::unique_ptr; diff --git a/source/common/common/shared_token_bucket_impl.cc b/source/common/common/shared_token_bucket_impl.cc index ebec0647589e1..50a5e02ec582e 100644 --- a/source/common/common/shared_token_bucket_impl.cc +++ b/source/common/common/shared_token_bucket_impl.cc @@ -32,7 +32,7 @@ std::chrono::milliseconds SharedTokenBucketImpl::nextTokenAvailable() { return impl_.nextTokenAvailable(); }; -void SharedTokenBucketImpl::reset(uint64_t num_tokens) { +void SharedTokenBucketImpl::maybeReset(uint64_t num_tokens) { Thread::LockGuard lock(mutex_); // Don't reset if reset once before. if (reset_once_) { @@ -40,7 +40,7 @@ void SharedTokenBucketImpl::reset(uint64_t num_tokens) { } reset_once_ = true; synchronizer_.syncPoint(ResetCheckSyncPoint); - impl_.reset(num_tokens); + impl_.maybeReset(num_tokens); }; } // namespace Envoy diff --git a/source/common/common/shared_token_bucket_impl.h b/source/common/common/shared_token_bucket_impl.h index 9df0397dcd606..7b677e543134d 100644 --- a/source/common/common/shared_token_bucket_impl.h +++ b/source/common/common/shared_token_bucket_impl.h @@ -38,7 +38,7 @@ class SharedTokenBucketImpl : public TokenBucket { * Since the token bucket is shared, only the first reset call will work. * Subsequent calls to reset method will be ignored. */ - void reset(uint64_t num_tokens) override; + void maybeReset(uint64_t num_tokens) override; private: Thread::MutexBasicLockable mutex_; diff --git a/source/common/common/token_bucket_impl.cc b/source/common/common/token_bucket_impl.cc index c038306c5403c..74dd981e1828d 100644 --- a/source/common/common/token_bucket_impl.cc +++ b/source/common/common/token_bucket_impl.cc @@ -45,7 +45,7 @@ std::chrono::milliseconds TokenBucketImpl::nextTokenAvailable() { return std::chrono::milliseconds(static_cast(std::ceil((1 / fill_rate_) * 1000))); } -void TokenBucketImpl::reset(uint64_t num_tokens) { +void TokenBucketImpl::maybeReset(uint64_t num_tokens) { ASSERT(num_tokens <= max_tokens_); tokens_ = num_tokens; last_fill_ = time_source_.monotonicTime(); diff --git a/source/common/common/token_bucket_impl.h b/source/common/common/token_bucket_impl.h index 56315549bc58c..b168b4cc2894e 100644 --- a/source/common/common/token_bucket_impl.h +++ b/source/common/common/token_bucket_impl.h @@ -25,7 +25,7 @@ class TokenBucketImpl : public TokenBucket { uint64_t consume(uint64_t tokens, bool allow_partial, std::chrono::milliseconds& time_to_next_token) override; std::chrono::milliseconds nextTokenAvailable() override; - void reset(uint64_t num_tokens) override; + void maybeReset(uint64_t num_tokens) override; private: const double max_tokens_; diff --git a/source/extensions/filters/http/fault/fault_filter.cc b/source/extensions/filters/http/fault/fault_filter.cc index 4b98e5583a89c..49da6059b1b83 100644 --- a/source/extensions/filters/http/fault/fault_filter.cc +++ b/source/extensions/filters/http/fault/fault_filter.cc @@ -539,7 +539,7 @@ void StreamRateLimiter::onTokenTimer() { // full 1s of data right away which might not introduce enough delay for a stream that doesn't // have enough data to span more than 1s of rate allowance). Once we reset, we will subsequently // allow for bursting within the second to account for our data provider being bursty. - token_bucket_.reset(1); + token_bucket_.maybeReset(1); saw_data_ = true; } diff --git a/test/common/common/shared_token_bucket_impl_test.cc b/test/common/common/shared_token_bucket_impl_test.cc index e55bd7a446de5..2a14019bdfc7d 100644 --- a/test/common/common/shared_token_bucket_impl_test.cc +++ b/test/common/common/shared_token_bucket_impl_test.cc @@ -104,7 +104,7 @@ TEST_F(SharedTokenBucketImplTest, Reset) { synchronizer(token_bucket).enable(); // Start a thread and call consume. This will wait post checking reset_once flag. synchronizer(token_bucket).waitOn(SharedTokenBucketImpl::ResetCheckSyncPoint); - std::thread thread([&] { token_bucket.reset(1); }); + std::thread thread([&] { token_bucket.maybeReset(1); }); // Wait until the thread is actually waiting. synchronizer(token_bucket).barrierOn(SharedTokenBucketImpl::ResetCheckSyncPoint); @@ -120,7 +120,7 @@ TEST_F(SharedTokenBucketImplTest, Reset) { EXPECT_EQ(std::chrono::milliseconds(63), token_bucket.nextTokenAvailable()); // Reset again. Should be ignored for shared bucket. - token_bucket.reset(5); + token_bucket.maybeReset(5); EXPECT_EQ(0, token_bucket.consume(5, true, time_to_next_token)); } @@ -178,7 +178,7 @@ TEST_F(SharedTokenBucketImplTest, SynchronizedConsumeAndNextToken) { EXPECT_EQ(std::chrono::milliseconds(0), token_bucket.nextTokenAvailable()); - token_bucket.reset(10); + token_bucket.maybeReset(10); // Exhaust all tokens. std::chrono::milliseconds time_to_next_token(0); EXPECT_EQ(10, token_bucket.consume(20, true, time_to_next_token)); diff --git a/test/common/common/token_bucket_impl_test.cc b/test/common/common/token_bucket_impl_test.cc index 3bc61bcb53753..30e273c024075 100644 --- a/test/common/common/token_bucket_impl_test.cc +++ b/test/common/common/token_bucket_impl_test.cc @@ -88,7 +88,7 @@ TEST_F(TokenBucketImplTest, PartialConsumption) { // Test reset functionality. TEST_F(TokenBucketImplTest, Reset) { TokenBucketImpl token_bucket{16, time_system_, 16}; - token_bucket.reset(1); + token_bucket.maybeReset(1); EXPECT_EQ(1, token_bucket.consume(2, true)); EXPECT_EQ(std::chrono::milliseconds(63), token_bucket.nextTokenAvailable()); } From 606d8b9ce67748e90943c6fd4f15fd2c59df54c2 Mon Sep 17 00:00:00 2001 From: Nitin Goyal Date: Thu, 11 Feb 2021 13:35:47 -0800 Subject: [PATCH 13/13] minor: wording Signed-off-by: Nitin Goyal --- include/envoy/common/token_bucket.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/envoy/common/token_bucket.h b/include/envoy/common/token_bucket.h index 396fbcfcae014..6b612f593a550 100644 --- a/include/envoy/common/token_bucket.h +++ b/include/envoy/common/token_bucket.h @@ -48,7 +48,7 @@ class TokenBucket { /** * Reset the bucket with a specific number of tokens. Refill will begin again from the time that * this routine is called. - * Note: The reset call might not be honored only the first time this method is called. Check the + * Note: The reset call might be honored only the first time this method is called. Check the * concrete implementation to confirm. */ virtual void maybeReset(uint64_t num_tokens) PURE;