diff --git a/source/common/rate_limiter_impl.cc b/source/common/rate_limiter_impl.cc index 89e548353..3432c34e1 100644 --- a/source/common/rate_limiter_impl.cc +++ b/source/common/rate_limiter_impl.cc @@ -52,6 +52,36 @@ void BurstingRateLimiter::releaseOne() { previously_releasing_ = absl::nullopt; } +ScheduledStartingRateLimiter::ScheduledStartingRateLimiter( + RateLimiterPtr&& rate_limiter, Envoy::MonotonicTime scheduled_starting_time) + : ForwardingRateLimiterImpl(std::move(rate_limiter)), + scheduled_starting_time_(scheduled_starting_time) { + if (timeSource().monotonicTime() >= scheduled_starting_time_) { + throw NighthawkException("Scheduled starting time needs to be in the future"); + } +} + +bool ScheduledStartingRateLimiter::tryAcquireOne() { + if (timeSource().monotonicTime() < scheduled_starting_time_) { + aquisition_attempted_ = true; + return false; + } + // If we start forwarding right away on the first attempt that is remarkable, so leave a hint + // about this happening in the logs. + if (!aquisition_attempted_) { + aquisition_attempted_ = true; + ENVOY_LOG(warn, "ScheduledStartingRateLimiter: first acquisition attempt was late"); + } + return rate_limiter_->tryAcquireOne(); +} + +void ScheduledStartingRateLimiter::releaseOne() { + if (timeSource().monotonicTime() < scheduled_starting_time_) { + throw NighthawkException("Unexpected call to releaseOne()"); + } + return rate_limiter_->releaseOne(); +} + LinearRateLimiter::LinearRateLimiter(Envoy::TimeSource& time_source, const Frequency frequency) : RateLimiterBaseImpl(time_source), acquireable_count_(0), acquired_count_(0), frequency_(frequency) { diff --git a/source/common/rate_limiter_impl.h b/source/common/rate_limiter_impl.h index b32392b2a..1fa97a054 100644 --- a/source/common/rate_limiter_impl.h +++ b/source/common/rate_limiter_impl.h @@ -112,6 +112,27 @@ class BurstingRateLimiter : public ForwardingRateLimiterImpl, absl::optional previously_releasing_; // Solely used for sanity checking. }; +/** + * Rate limiter that only starts forwarding calls to the wrapped rate limiter + * after it is time to start. + */ +class ScheduledStartingRateLimiter : public ForwardingRateLimiterImpl, + public Envoy::Logger::Loggable { +public: + /** + * @param rate_limiter The rate limiter that will be forwarded to once it is time to start. + * @param scheduled_starting_time The starting time + */ + ScheduledStartingRateLimiter(RateLimiterPtr&& rate_limiter, + Envoy::MonotonicTime scheduled_starting_time); + bool tryAcquireOne() override; + void releaseOne() override; + +private: + const Envoy::MonotonicTime scheduled_starting_time_; + bool aquisition_attempted_{false}; +}; + /** * The consuming rate limiter will hold off opening up until the initial point in time plus the * offset obtained via the delegate have transpired. diff --git a/test/rate_limiter_test.cc b/test/rate_limiter_test.cc index 82816656c..ccbb03543 100644 --- a/test/rate_limiter_test.cc +++ b/test/rate_limiter_test.cc @@ -68,6 +68,58 @@ TEST_F(RateLimiterTest, BurstingRateLimiterTest) { EXPECT_FALSE(rate_limiter->tryAcquireOne()); } +TEST_F(RateLimiterTest, ScheduledStartingRateLimiterTest) { + Envoy::Event::SimulatedTimeSystem time_system; + const auto schedule_delay = 10ms; + // We test regular flow, but also the flow where the first aquisition attempt comes after the + // scheduled delay. This should be business as usual from a functional perspective, but internally + // this rate limiter specializes on this case to log a warning message, and we want to cover that. + for (const bool starting_late : std::vector{false, true}) { + const Envoy::MonotonicTime scheduled_starting_time = + time_system.monotonicTime() + schedule_delay; + std::unique_ptr mock_rate_limiter = std::make_unique(); + MockRateLimiter& unsafe_mock_rate_limiter = *mock_rate_limiter; + InSequence s; + + EXPECT_CALL(unsafe_mock_rate_limiter, timeSource) + .Times(AtLeast(1)) + .WillRepeatedly(ReturnRef(time_system)); + RateLimiterPtr rate_limiter = std::make_unique( + std::move(mock_rate_limiter), scheduled_starting_time); + EXPECT_CALL(unsafe_mock_rate_limiter, tryAcquireOne) + .Times(AtLeast(1)) + .WillRepeatedly(Return(true)); + + if (starting_late) { + time_system.sleep(schedule_delay); + } + + // We should expect zero releases until it is time to start. + while (time_system.monotonicTime() < scheduled_starting_time) { + EXPECT_FALSE(rate_limiter->tryAcquireOne()); + time_system.sleep(1ms); + } + + // Now that is time to start, the rate limiter should propagate to the mock rate limiter. + EXPECT_TRUE(rate_limiter->tryAcquireOne()); + } +} + +TEST_F(RateLimiterTest, ScheduledStartingRateLimiterTestBadArgs) { + Envoy::Event::SimulatedTimeSystem time_system; + // Verify we enforce future-only scheduling. + for (const auto timing : std::vector{time_system.monotonicTime(), + time_system.monotonicTime() - 10ms}) { + std::unique_ptr mock_rate_limiter = std::make_unique(); + MockRateLimiter& unsafe_mock_rate_limiter = *mock_rate_limiter; + EXPECT_CALL(unsafe_mock_rate_limiter, timeSource) + .Times(AtLeast(1)) + .WillRepeatedly(ReturnRef(time_system)); + EXPECT_THROW(ScheduledStartingRateLimiter(std::move(mock_rate_limiter), timing); + , NighthawkException); + } +} + class BurstingRateLimiterIntegrationTest : public Test { public: void testBurstSize(const uint64_t burst_size, const Frequency frequency) {