diff --git a/include/envoy/common/token_bucket.h b/include/envoy/common/token_bucket.h index f3db6a884154f..6b612f593a550 100644 --- a/include/envoy/common/token_bucket.h +++ b/include/envoy/common/token_bucket.h @@ -27,6 +27,18 @@ class TokenBucket { */ virtual uint64_t consume(uint64_t tokens, bool allow_partial) PURE; + /** + * @param tokens supplies the number of tokens to be consumed. + * @param allow_partial supplies whether the token bucket will allow consumption of less tokens + * than asked for. If allow_partial is true, the bucket contains 3 tokens, + * and the caller asks for 5, the bucket will return 3 tokens and now be + * empty. + * @param time_to_next_token out param indicating the approx time until next token is available. + * @return the number of tokens actually consumed. + */ + virtual uint64_t consume(uint64_t tokens, bool allow_partial, + std::chrono::milliseconds& time_to_next_token) PURE; + /** * @return returns the approximate time until a next token is available. Currently it * returns the upper bound on the amount of time until a next token is available. @@ -36,8 +48,10 @@ class TokenBucket { /** * Reset the bucket with a specific number of tokens. Refill will begin again from the time that * this routine is called. + * Note: The reset call might be honored only the first time this method is called. Check the + * concrete implementation to confirm. */ - virtual void reset(uint64_t num_tokens) PURE; + virtual void maybeReset(uint64_t num_tokens) PURE; }; using TokenBucketPtr = std::unique_ptr; diff --git a/source/common/common/BUILD b/source/common/common/BUILD index bf7701a21fa88..ac367a1fb5b1e 100644 --- a/source/common/common/BUILD +++ b/source/common/common/BUILD @@ -407,6 +407,19 @@ envoy_cc_library( ], ) +envoy_cc_library( + name = "shared_token_bucket_impl_lib", + srcs = ["shared_token_bucket_impl.cc"], + hdrs = ["shared_token_bucket_impl.h"], + deps = [ + "//include/envoy/common:time_interface", + "//source/common/common:thread_lib", + "//source/common/common:thread_synchronizer_lib", + "//source/common/common:token_bucket_impl_lib", + "//source/common/common:utility_lib", + ], +) + envoy_cc_library( name = "statusor_lib", hdrs = ["statusor.h"], diff --git a/source/common/common/shared_token_bucket_impl.cc b/source/common/common/shared_token_bucket_impl.cc new file mode 100644 index 0000000000000..50a5e02ec582e --- /dev/null +++ b/source/common/common/shared_token_bucket_impl.cc @@ -0,0 +1,46 @@ +#include "common/common/shared_token_bucket_impl.h" + +#include + +#include "common/common/lock_guard.h" + +namespace Envoy { + +const char SharedTokenBucketImpl::GetImplSyncPoint[] = "pre_get_impl"; +const char SharedTokenBucketImpl::ResetCheckSyncPoint[] = "post_reset_check"; + +SharedTokenBucketImpl::SharedTokenBucketImpl(uint64_t max_tokens, TimeSource& time_source, + double fill_rate) + : impl_(max_tokens, time_source, fill_rate), reset_once_(false) {} + +uint64_t SharedTokenBucketImpl::consume(uint64_t tokens, bool allow_partial) { + Thread::LockGuard lock(mutex_); + synchronizer_.syncPoint(GetImplSyncPoint); + return impl_.consume(tokens, allow_partial); +}; + +uint64_t SharedTokenBucketImpl::consume(uint64_t tokens, bool allow_partial, + std::chrono::milliseconds& time_to_next_token) { + Thread::LockGuard lock(mutex_); + synchronizer_.syncPoint(GetImplSyncPoint); + return impl_.consume(tokens, allow_partial, time_to_next_token); +}; + +std::chrono::milliseconds SharedTokenBucketImpl::nextTokenAvailable() { + Thread::LockGuard lock(mutex_); + synchronizer_.syncPoint(GetImplSyncPoint); + return impl_.nextTokenAvailable(); +}; + +void SharedTokenBucketImpl::maybeReset(uint64_t num_tokens) { + Thread::LockGuard lock(mutex_); + // Don't reset if reset once before. + if (reset_once_) { + return; + } + reset_once_ = true; + synchronizer_.syncPoint(ResetCheckSyncPoint); + impl_.maybeReset(num_tokens); +}; + +} // namespace Envoy diff --git a/source/common/common/shared_token_bucket_impl.h b/source/common/common/shared_token_bucket_impl.h new file mode 100644 index 0000000000000..7b677e543134d --- /dev/null +++ b/source/common/common/shared_token_bucket_impl.h @@ -0,0 +1,51 @@ +#pragma once + +#include "common/common/thread.h" +#include "common/common/thread_synchronizer.h" +#include "common/common/token_bucket_impl.h" + +namespace Envoy { + +/** + * A thread-safe wrapper class for TokenBucket interface. + */ +class SharedTokenBucketImpl : public TokenBucket { +public: + static const char GetImplSyncPoint[]; + static const char ResetCheckSyncPoint[]; + /** + * @param max_tokens supplies the maximum number of tokens in the bucket. + * @param time_source supplies the time source. + * @param fill_rate supplies the number of tokens that will return to the bucket on each second. + * The default is 1. + * @param mutex supplies the mutex object to be used to ensure thread-safety when the token bucket + * is shared. By default the class will be thread-unsafe. + */ + explicit SharedTokenBucketImpl(uint64_t max_tokens, TimeSource& time_source, + double fill_rate = 1); + + SharedTokenBucketImpl(const SharedTokenBucketImpl&) = delete; + SharedTokenBucketImpl(SharedTokenBucketImpl&&) = delete; + + // TokenBucket + + uint64_t consume(uint64_t tokens, bool allow_partial) override; + uint64_t consume(uint64_t tokens, bool allow_partial, + std::chrono::milliseconds& time_to_next_token) override; + std::chrono::milliseconds nextTokenAvailable() override; + + /** + * Since the token bucket is shared, only the first reset call will work. + * Subsequent calls to reset method will be ignored. + */ + void maybeReset(uint64_t num_tokens) override; + +private: + Thread::MutexBasicLockable mutex_; + TokenBucketImpl impl_ ABSL_GUARDED_BY(mutex_); + bool reset_once_ ABSL_GUARDED_BY(mutex_); + mutable Thread::ThreadSynchronizer synchronizer_; // Used only for testing. + friend class SharedTokenBucketImplTest; +}; + +} // namespace Envoy diff --git a/source/common/common/token_bucket_impl.cc b/source/common/common/token_bucket_impl.cc index 5e7de9e6bb1a7..74dd981e1828d 100644 --- a/source/common/common/token_bucket_impl.cc +++ b/source/common/common/token_bucket_impl.cc @@ -29,6 +29,13 @@ uint64_t TokenBucketImpl::consume(uint64_t tokens, bool allow_partial) { return tokens; } +uint64_t TokenBucketImpl::consume(uint64_t tokens, bool allow_partial, + std::chrono::milliseconds& time_to_next_token) { + auto tokens_consumed = consume(tokens, allow_partial); + time_to_next_token = nextTokenAvailable(); + return tokens_consumed; +} + std::chrono::milliseconds TokenBucketImpl::nextTokenAvailable() { // If there are tokens available, return immediately. if (tokens_ >= 1) { @@ -38,7 +45,7 @@ std::chrono::milliseconds TokenBucketImpl::nextTokenAvailable() { return std::chrono::milliseconds(static_cast(std::ceil((1 / fill_rate_) * 1000))); } -void TokenBucketImpl::reset(uint64_t num_tokens) { +void TokenBucketImpl::maybeReset(uint64_t num_tokens) { ASSERT(num_tokens <= max_tokens_); tokens_ = num_tokens; last_fill_ = time_source_.monotonicTime(); diff --git a/source/common/common/token_bucket_impl.h b/source/common/common/token_bucket_impl.h index 644a4185dd5ab..b168b4cc2894e 100644 --- a/source/common/common/token_bucket_impl.h +++ b/source/common/common/token_bucket_impl.h @@ -22,8 +22,10 @@ class TokenBucketImpl : public TokenBucket { // TokenBucket uint64_t consume(uint64_t tokens, bool allow_partial) override; + uint64_t consume(uint64_t tokens, bool allow_partial, + std::chrono::milliseconds& time_to_next_token) override; std::chrono::milliseconds nextTokenAvailable() override; - void reset(uint64_t num_tokens) override; + void maybeReset(uint64_t num_tokens) override; private: const double max_tokens_; diff --git a/source/extensions/filters/http/fault/fault_filter.cc b/source/extensions/filters/http/fault/fault_filter.cc index 4b98e5583a89c..49da6059b1b83 100644 --- a/source/extensions/filters/http/fault/fault_filter.cc +++ b/source/extensions/filters/http/fault/fault_filter.cc @@ -539,7 +539,7 @@ void StreamRateLimiter::onTokenTimer() { // full 1s of data right away which might not introduce enough delay for a stream that doesn't // have enough data to span more than 1s of rate allowance). Once we reset, we will subsequently // allow for bursting within the second to account for our data provider being bursty. - token_bucket_.reset(1); + token_bucket_.maybeReset(1); saw_data_ = true; } diff --git a/test/common/common/BUILD b/test/common/common/BUILD index 1b52c8229856a..af59c4278c69c 100644 --- a/test/common/common/BUILD +++ b/test/common/common/BUILD @@ -265,6 +265,16 @@ envoy_cc_test( ], ) +envoy_cc_test( + name = "shared_token_bucket_impl_test", + srcs = ["shared_token_bucket_impl_test.cc"], + deps = [ + "//source/common/common:shared_token_bucket_impl_lib", + "//test/test_common:simulated_time_system_lib", + "//test/test_common:utility_lib", + ], +) + envoy_cc_test( name = "callback_impl_test", srcs = ["callback_impl_test.cc"], diff --git a/test/common/common/shared_token_bucket_impl_test.cc b/test/common/common/shared_token_bucket_impl_test.cc new file mode 100644 index 0000000000000..2a14019bdfc7d --- /dev/null +++ b/test/common/common/shared_token_bucket_impl_test.cc @@ -0,0 +1,199 @@ +#include + +#include "common/common/shared_token_bucket_impl.h" + +#include "test/test_common/simulated_time_system.h" + +#include "gtest/gtest.h" + +namespace Envoy { + +class SharedTokenBucketImplTest : public testing::Test { +protected: + bool isMutexLocked(SharedTokenBucketImpl& token) { + auto locked = token.mutex_.tryLock(); + if (locked) { + token.mutex_.unlock(); + } + return !locked; + } + + Thread::ThreadSynchronizer& synchronizer(SharedTokenBucketImpl& token) { + return token.synchronizer_; + }; + + Event::SimulatedTimeSystem time_system_; + std::chrono::milliseconds time_to_next_token; +}; + +// Verifies TokenBucket initialization. +TEST_F(SharedTokenBucketImplTest, Initialization) { + SharedTokenBucketImpl token_bucket{1, time_system_, -1.0}; + + EXPECT_EQ(1, token_bucket.consume(1, false, time_to_next_token)); + EXPECT_EQ(0, token_bucket.consume(1, false, time_to_next_token)); +} + +// Verifies TokenBucket's maximum capacity. +TEST_F(SharedTokenBucketImplTest, MaxBucketSize) { + SharedTokenBucketImpl token_bucket{3, time_system_, 1}; + + EXPECT_EQ(3, token_bucket.consume(3, false, time_to_next_token)); + time_system_.setMonotonicTime(std::chrono::seconds(10)); + EXPECT_EQ(0, token_bucket.consume(4, false, time_to_next_token)); + EXPECT_EQ(3, token_bucket.consume(3, false, time_to_next_token)); +} + +// Verifies that TokenBucket can consume tokens. +TEST_F(SharedTokenBucketImplTest, Consume) { + SharedTokenBucketImpl token_bucket{10, time_system_, 1}; + + EXPECT_EQ(0, token_bucket.consume(20, false, time_to_next_token)); + EXPECT_EQ(9, token_bucket.consume(9, false, time_to_next_token)); + + EXPECT_EQ(1, token_bucket.consume(1, false, time_to_next_token)); + + time_system_.setMonotonicTime(std::chrono::milliseconds(999)); + EXPECT_EQ(0, token_bucket.consume(1, false, time_to_next_token)); + + time_system_.setMonotonicTime(std::chrono::milliseconds(5999)); + EXPECT_EQ(0, token_bucket.consume(6, false, time_to_next_token)); + + time_system_.setMonotonicTime(std::chrono::milliseconds(6000)); + EXPECT_EQ(6, token_bucket.consume(6, false, time_to_next_token)); + EXPECT_EQ(0, token_bucket.consume(1, false, time_to_next_token)); +} + +// Verifies that TokenBucket can refill tokens. +TEST_F(SharedTokenBucketImplTest, Refill) { + SharedTokenBucketImpl token_bucket{1, time_system_, 0.5}; + EXPECT_EQ(1, token_bucket.consume(1, false, time_to_next_token)); + + time_system_.setMonotonicTime(std::chrono::milliseconds(500)); + EXPECT_EQ(0, token_bucket.consume(1, false, time_to_next_token)); + time_system_.setMonotonicTime(std::chrono::milliseconds(1500)); + EXPECT_EQ(0, token_bucket.consume(1, false, time_to_next_token)); + time_system_.setMonotonicTime(std::chrono::milliseconds(2000)); + EXPECT_EQ(1, token_bucket.consume(1, false, time_to_next_token)); +} + +TEST_F(SharedTokenBucketImplTest, NextTokenAvailable) { + SharedTokenBucketImpl token_bucket{10, time_system_, 5}; + EXPECT_EQ(9, token_bucket.consume(9, false, time_to_next_token)); + EXPECT_EQ(std::chrono::milliseconds(0), token_bucket.nextTokenAvailable()); + EXPECT_EQ(1, token_bucket.consume(1, false, time_to_next_token)); + EXPECT_EQ(0, token_bucket.consume(1, false, time_to_next_token)); + EXPECT_EQ(std::chrono::milliseconds(200), token_bucket.nextTokenAvailable()); +} + +// Test partial consumption of tokens. +TEST_F(SharedTokenBucketImplTest, PartialConsumption) { + SharedTokenBucketImpl token_bucket{16, time_system_, 16}; + EXPECT_EQ(16, token_bucket.consume(18, true, time_to_next_token)); + EXPECT_EQ(std::chrono::milliseconds(63), token_bucket.nextTokenAvailable()); + time_system_.advanceTimeWait(std::chrono::milliseconds(62)); + EXPECT_EQ(0, token_bucket.consume(1, true, time_to_next_token)); + time_system_.advanceTimeWait(std::chrono::milliseconds(1)); + EXPECT_EQ(1, token_bucket.consume(2, true, time_to_next_token)); + EXPECT_EQ(std::chrono::milliseconds(63), token_bucket.nextTokenAvailable()); +} + +// Test reset functionality for a shared token bucket. +TEST_F(SharedTokenBucketImplTest, Reset) { + SharedTokenBucketImpl token_bucket{16, time_system_, 16}; + synchronizer(token_bucket).enable(); + // Start a thread and call consume. This will wait post checking reset_once flag. + synchronizer(token_bucket).waitOn(SharedTokenBucketImpl::ResetCheckSyncPoint); + std::thread thread([&] { token_bucket.maybeReset(1); }); + + // Wait until the thread is actually waiting. + synchronizer(token_bucket).barrierOn(SharedTokenBucketImpl::ResetCheckSyncPoint); + + // Mutex should be already locked. + EXPECT_TRUE(isMutexLocked(token_bucket)); + synchronizer(token_bucket).signal(SharedTokenBucketImpl::ResetCheckSyncPoint); + + thread.join(); + EXPECT_FALSE(isMutexLocked(token_bucket)); + + EXPECT_EQ(1, token_bucket.consume(2, true, time_to_next_token)); + EXPECT_EQ(std::chrono::milliseconds(63), token_bucket.nextTokenAvailable()); + + // Reset again. Should be ignored for shared bucket. + token_bucket.maybeReset(5); + EXPECT_EQ(0, token_bucket.consume(5, true, time_to_next_token)); +} + +// Verifies that TokenBucket can consume tokens with thread safety. +TEST_F(SharedTokenBucketImplTest, SynchronizedConsume) { + SharedTokenBucketImpl token_bucket{10, time_system_, 1}; + + synchronizer(token_bucket).enable(); + // Start a thread and call consume. This will wait post lock. + synchronizer(token_bucket).waitOn(SharedTokenBucketImpl::GetImplSyncPoint); + std::thread thread([&] { EXPECT_EQ(10, token_bucket.consume(20, true, time_to_next_token)); }); + + // Wait until the thread is actually waiting. + synchronizer(token_bucket).barrierOn(SharedTokenBucketImpl::GetImplSyncPoint); + + // Mutex should be already locked. + EXPECT_TRUE(isMutexLocked(token_bucket)); + synchronizer(token_bucket).signal(SharedTokenBucketImpl::GetImplSyncPoint); + thread.join(); + EXPECT_FALSE(isMutexLocked(token_bucket)); +} + +TEST_F(SharedTokenBucketImplTest, SynchronizedNextTokenAvailable) { + SharedTokenBucketImpl token_bucket{10, time_system_, 16}; + + synchronizer(token_bucket).enable(); + // Start a thread and call consume. This will wait post lock. + synchronizer(token_bucket).waitOn(SharedTokenBucketImpl::GetImplSyncPoint); + std::thread thread( + [&] { EXPECT_EQ(std::chrono::milliseconds(0), token_bucket.nextTokenAvailable()); }); + + // Wait until the thread is actually waiting. + synchronizer(token_bucket).barrierOn(SharedTokenBucketImpl::GetImplSyncPoint); + + // Mutex should be already locked. + EXPECT_TRUE(isMutexLocked(token_bucket)); + synchronizer(token_bucket).signal(SharedTokenBucketImpl::GetImplSyncPoint); + thread.join(); + EXPECT_FALSE(isMutexLocked(token_bucket)); +} + +// Verifies that TokenBucket can consume tokens with thread safety. +TEST_F(SharedTokenBucketImplTest, SynchronizedConsumeAndNextToken) { + SharedTokenBucketImpl token_bucket{10, time_system_, 5}; + + // Exhaust all tokens. + EXPECT_EQ(10, token_bucket.consume(20, true, time_to_next_token)); + EXPECT_EQ(std::chrono::milliseconds(200), token_bucket.nextTokenAvailable()); + time_system_.advanceTimeWait(std::chrono::milliseconds(400)); + + // Start a thread and call consume to refill tokens. + std::thread t1([&] { EXPECT_EQ(1, token_bucket.consume(1, false, time_to_next_token)); }); + + t1.join(); + + EXPECT_EQ(std::chrono::milliseconds(0), token_bucket.nextTokenAvailable()); + + token_bucket.maybeReset(10); + // Exhaust all tokens. + std::chrono::milliseconds time_to_next_token(0); + EXPECT_EQ(10, token_bucket.consume(20, true, time_to_next_token)); + EXPECT_EQ(time_to_next_token.count(), 200); + time_system_.advanceTimeWait(std::chrono::milliseconds(400)); + + // Start a thread and call consume to refill tokens. + std::thread t2([&] { + EXPECT_EQ(1, token_bucket.consume(1, false, time_to_next_token)); + EXPECT_EQ(time_to_next_token.count(), 0); + }); + + t2.join(); + + EXPECT_EQ(std::chrono::milliseconds(0), token_bucket.nextTokenAvailable()); +} + +} // namespace Envoy diff --git a/test/common/common/token_bucket_impl_test.cc b/test/common/common/token_bucket_impl_test.cc index 8e2b6b2b27a59..30e273c024075 100644 --- a/test/common/common/token_bucket_impl_test.cc +++ b/test/common/common/token_bucket_impl_test.cc @@ -88,9 +88,26 @@ TEST_F(TokenBucketImplTest, PartialConsumption) { // Test reset functionality. TEST_F(TokenBucketImplTest, Reset) { TokenBucketImpl token_bucket{16, time_system_, 16}; - token_bucket.reset(1); + token_bucket.maybeReset(1); EXPECT_EQ(1, token_bucket.consume(2, true)); EXPECT_EQ(std::chrono::milliseconds(63), token_bucket.nextTokenAvailable()); } +// Verifies that TokenBucket can consume tokens and return next token time. +TEST_F(TokenBucketImplTest, ConsumeAndNextToken) { + TokenBucketImpl token_bucket{10, time_system_, 5}; + + // Exhaust all tokens. + std::chrono::milliseconds time_to_next_token(0); + EXPECT_EQ(10, token_bucket.consume(20, true, time_to_next_token)); + EXPECT_EQ(time_to_next_token.count(), 200); + EXPECT_EQ(time_to_next_token, token_bucket.nextTokenAvailable()); + time_system_.advanceTimeWait(std::chrono::milliseconds(400)); + + // Start a thread and call consume to refill tokens. + EXPECT_EQ(1, token_bucket.consume(1, false, time_to_next_token)); + EXPECT_EQ(time_to_next_token.count(), 0); + EXPECT_EQ(time_to_next_token, token_bucket.nextTokenAvailable()); +} + } // namespace Envoy