diff --git a/test/extensions/filters/http/adaptive_concurrency/controller/gradient_controller_test.cc b/test/extensions/filters/http/adaptive_concurrency/controller/gradient_controller_test.cc index 3134e30c906c1..a2f39e88d0f07 100644 --- a/test/extensions/filters/http/adaptive_concurrency/controller/gradient_controller_test.cc +++ b/test/extensions/filters/http/adaptive_concurrency/controller/gradient_controller_test.cc @@ -57,7 +57,7 @@ class GradientControllerTest : public testing::Test { stats_, random_, time_system_); // Advance time so that the latency sample calculations don't underflow if monotonic time is 0. - time_system_.advanceTimeAsync(std::chrono::hours(42)); + advanceTimeAndLoop(std::chrono::hours(42)); return config; } @@ -107,6 +107,11 @@ class GradientControllerTest : public testing::Test { .value()); } + template void advanceTimeAndLoop(DurationType duration) { + time_system_.advanceTimeAsync(duration); + dispatcher_->run(Event::Dispatcher::RunType::Block); + } + Event::SimulatedTimeSystem time_system_; Stats::TestUtil::TestStore stats_; NiceMock runtime_; @@ -258,7 +263,7 @@ TEST_F(GradientControllerTest, MinRTTEpoch) { const int min_concurrency = 2; auto controller = makeController(yaml); const auto min_rtt = std::chrono::milliseconds(1350); - time_system_.advanceTimeAsync(min_rtt); + advanceTimeAndLoop(min_rtt); verifyMinRTTActive(); EXPECT_EQ(controller->concurrencyLimit(), min_concurrency); @@ -270,7 +275,7 @@ TEST_F(GradientControllerTest, MinRTTEpoch) { uint32_t last_limit = controller->concurrencyLimit(); for (int i = 0; i < 29; ++i) { tryForward(controller, true); - time_system_.advanceTimeAsync(std::chrono::seconds(1)); + advanceTimeAndLoop(std::chrono::seconds(1)); sampleLatency(controller, min_rtt); dispatcher_->run(Event::Dispatcher::RunType::Block); EXPECT_GT(controller->concurrencyLimit(), last_limit); @@ -286,8 +291,7 @@ TEST_F(GradientControllerTest, MinRTTEpoch) { } // Move into the next minRTT window while the requests are outstanding. - time_system_.advanceTimeAsync(std::chrono::seconds(5)); - dispatcher_->run(Event::Dispatcher::RunType::Block); + advanceTimeAndLoop(std::chrono::seconds(5)); verifyMinRTTActive(); EXPECT_EQ(controller->concurrencyLimit(), min_concurrency); @@ -330,7 +334,7 @@ TEST_F(GradientControllerTest, MinRTTLogicTest) { } tryForward(controller, false); tryForward(controller, false); - time_system_.advanceTimeAsync(min_rtt); + advanceTimeAndLoop(min_rtt); for (int i = 0; i < 7; ++i) { sampleLatency(controller, min_rtt); } @@ -427,8 +431,7 @@ TEST_F(GradientControllerTest, MinRTTBufferTest) { // prevent the concurrency limit from decreasing. sampleLatency(controller, std::chrono::milliseconds(6)); } - time_system_.advanceTimeAsync(std::chrono::milliseconds(101)); - dispatcher_->run(Event::Dispatcher::RunType::Block); + advanceTimeAndLoop(std::chrono::milliseconds(101)); EXPECT_GT(controller->concurrencyLimit(), last_concurrency); } } @@ -459,8 +462,7 @@ TEST_F(GradientControllerTest, ConcurrencyLimitBehaviorTestBasic) { // Ensure that the concurrency window increases on its own due to the headroom calculation with // the max gradient. - time_system_.advanceTimeAsync(std::chrono::milliseconds(101)); - dispatcher_->run(Event::Dispatcher::RunType::Block); + advanceTimeAndLoop(std::chrono::milliseconds(101)); EXPECT_GE(controller->concurrencyLimit(), 7); EXPECT_LE(controller->concurrencyLimit() / 7.0, 2.0); @@ -472,8 +474,7 @@ TEST_F(GradientControllerTest, ConcurrencyLimitBehaviorTestBasic) { tryForward(controller, true); sampleLatency(controller, std::chrono::milliseconds(4)); } - time_system_.advanceTimeAsync(std::chrono::milliseconds(101)); - dispatcher_->run(Event::Dispatcher::RunType::Block); + advanceTimeAndLoop(std::chrono::milliseconds(101)); // Verify the minimum gradient. EXPECT_LE(last_concurrency, controller->concurrencyLimit()); EXPECT_GE(static_cast(last_concurrency) / controller->concurrencyLimit(), 0.5); @@ -486,8 +487,7 @@ TEST_F(GradientControllerTest, ConcurrencyLimitBehaviorTestBasic) { tryForward(controller, true); sampleLatency(controller, std::chrono::milliseconds(6)); } - time_system_.advanceTimeAsync(std::chrono::milliseconds(101)); - dispatcher_->run(Event::Dispatcher::RunType::Block); + advanceTimeAndLoop(std::chrono::milliseconds(101)); EXPECT_LT(controller->concurrencyLimit(), last_concurrency); EXPECT_GE(controller->concurrencyLimit(), 7); } @@ -513,7 +513,7 @@ TEST_F(GradientControllerTest, MinRTTReturnToPreviousLimit) { // Get initial minRTT measurement out of the way and advance time so request samples are not // thought to come from the previous minRTT epoch. advancePastMinRTTStage(controller, yaml, std::chrono::milliseconds(5)); - time_system_.advanceTimeAsync(std::chrono::seconds(1)); + advanceTimeAndLoop(std::chrono::seconds(1)); // Force the limit calculation to run a few times from some measurements. for (int sample_iters = 0; sample_iters < 5; ++sample_iters) { @@ -522,8 +522,7 @@ TEST_F(GradientControllerTest, MinRTTReturnToPreviousLimit) { tryForward(controller, true); sampleLatency(controller, std::chrono::milliseconds(4)); } - time_system_.advanceTimeAsync(std::chrono::milliseconds(101)); - dispatcher_->run(Event::Dispatcher::RunType::Block); + advanceTimeAndLoop(std::chrono::milliseconds(101)); // Verify the value is growing. EXPECT_GT(controller->concurrencyLimit(), last_concurrency); } @@ -531,12 +530,11 @@ TEST_F(GradientControllerTest, MinRTTReturnToPreviousLimit) { const auto limit_val = controller->concurrencyLimit(); // Wait until the minRTT recalculation is triggered again and verify the limit drops. - time_system_.advanceTimeAsync(std::chrono::seconds(31)); - dispatcher_->run(Event::Dispatcher::RunType::Block); + advanceTimeAndLoop(std::chrono::seconds(31)); EXPECT_EQ(controller->concurrencyLimit(), 3); // Advance time again for request samples to appear from the current epoch. - time_system_.advanceTimeAsync(std::chrono::seconds(1)); + advanceTimeAndLoop(std::chrono::seconds(1)); // 49 more requests should cause the minRTT to be done calculating. for (int i = 0; i < 5; ++i) { @@ -569,7 +567,7 @@ TEST_F(GradientControllerTest, MinRTTRescheduleTest) { // Get initial minRTT measurement out of the way and advance time so request samples are not // thought to come from the previous minRTT epoch. advancePastMinRTTStage(controller, yaml, std::chrono::milliseconds(5)); - time_system_.advanceTimeAsync(std::chrono::seconds(1)); + advanceTimeAndLoop(std::chrono::seconds(1)); // Force the limit calculation to run a few times from some measurements. for (int sample_iters = 0; sample_iters < 5; ++sample_iters) { @@ -578,20 +576,17 @@ TEST_F(GradientControllerTest, MinRTTRescheduleTest) { tryForward(controller, true); sampleLatency(controller, std::chrono::milliseconds(4)); } - time_system_.advanceTimeAsync(std::chrono::milliseconds(101)); - dispatcher_->run(Event::Dispatcher::RunType::Block); + advanceTimeAndLoop(std::chrono::milliseconds(101)); // Verify the value is growing. EXPECT_GT(controller->concurrencyLimit(), last_concurrency); } // Wait until the minRTT recalculation is triggered again and verify the limit drops. - time_system_.advanceTimeAsync(std::chrono::seconds(31)); - dispatcher_->run(Event::Dispatcher::RunType::Block); + advanceTimeAndLoop(std::chrono::seconds(31)); EXPECT_EQ(controller->concurrencyLimit(), 3); // Verify sample recalculation doesn't occur during the minRTT window. - time_system_.advanceTimeAsync(std::chrono::milliseconds(101)); - dispatcher_->run(Event::Dispatcher::RunType::Block); + advanceTimeAndLoop(std::chrono::milliseconds(101)); EXPECT_EQ(controller->concurrencyLimit(), 3); } @@ -622,8 +617,7 @@ TEST_F(GradientControllerTest, NoSamplesTest) { tryForward(controller, true); sampleLatency(controller, std::chrono::milliseconds(4)); } - time_system_.advanceTimeAsync(std::chrono::milliseconds(101)); - dispatcher_->run(Event::Dispatcher::RunType::Block); + advanceTimeAndLoop(std::chrono::milliseconds(101)); // Verify the value is growing. EXPECT_GT(controller->concurrencyLimit(), last_concurrency); } @@ -631,8 +625,7 @@ TEST_F(GradientControllerTest, NoSamplesTest) { // Now we make sure that the limit value doesn't change in the absence of samples. for (int sample_iters = 0; sample_iters < 5; ++sample_iters) { const auto old_limit = controller->concurrencyLimit(); - time_system_.advanceTimeAsync(std::chrono::milliseconds(101)); - dispatcher_->run(Event::Dispatcher::RunType::Block); + advanceTimeAndLoop(std::chrono::milliseconds(101)); EXPECT_EQ(old_limit, controller->concurrencyLimit()); } } @@ -676,7 +669,7 @@ TEST_F(GradientControllerTest, TimerAccuracyTest) { EXPECT_CALL(*sample_timer, enableTimer(std::chrono::milliseconds(123), _)); for (int i = 0; i < 6; ++i) { tryForward(controller, true); - time_system_.advanceTimeAsync(std::chrono::milliseconds(5)); + advanceTimeAndLoop(std::chrono::milliseconds(5)); sampleLatency(controller, std::chrono::milliseconds(5)); } } @@ -716,7 +709,7 @@ TEST_F(GradientControllerTest, TimerAccuracyTestNoJitter) { EXPECT_CALL(*sample_timer, enableTimer(std::chrono::milliseconds(123), _)); for (int i = 0; i < 6; ++i) { tryForward(controller, true); - time_system_.advanceTimeAsync(std::chrono::milliseconds(5)); + advanceTimeAndLoop(std::chrono::milliseconds(5)); sampleLatency(controller, std::chrono::milliseconds(5)); } } @@ -749,8 +742,7 @@ TEST_F(GradientControllerTest, ConsecutiveMinConcurrencyReset) { // Ensure that the concurrency window increases on its own due to the headroom calculation with // the max gradient. - time_system_.advanceTimeAsync(std::chrono::milliseconds(101)); - dispatcher_->run(Event::Dispatcher::RunType::Block); + advanceTimeAndLoop(std::chrono::milliseconds(101)); EXPECT_GE(controller->concurrencyLimit(), 7); EXPECT_LE(controller->concurrencyLimit() / 7.0, 2.0); @@ -762,8 +754,7 @@ TEST_F(GradientControllerTest, ConsecutiveMinConcurrencyReset) { tryForward(controller, true); sampleLatency(controller, elevated_latency); } - time_system_.advanceTimeAsync(std::chrono::milliseconds(101)); - dispatcher_->run(Event::Dispatcher::RunType::Block); + advanceTimeAndLoop(std::chrono::milliseconds(101)); } // Verify that the concurrency limit starts growing with newly measured minRTT. @@ -773,8 +764,7 @@ TEST_F(GradientControllerTest, ConsecutiveMinConcurrencyReset) { tryForward(controller, true); sampleLatency(controller, elevated_latency); } - time_system_.advanceTimeAsync(std::chrono::milliseconds(101)); - dispatcher_->run(Event::Dispatcher::RunType::Block); + advanceTimeAndLoop(std::chrono::milliseconds(101)); EXPECT_GE(controller->concurrencyLimit(), last_concurrency); } } diff --git a/test/extensions/quic_listeners/quiche/active_quic_listener_test.cc b/test/extensions/quic_listeners/quiche/active_quic_listener_test.cc index b41b6bdd311d1..5d7cfdea89d83 100644 --- a/test/extensions/quic_listeners/quiche/active_quic_listener_test.cc +++ b/test/extensions/quic_listeners/quiche/active_quic_listener_test.cc @@ -136,7 +136,8 @@ class ActiveQuicListenerTest : public QuicMultiVersionTest { auto proof_source = std::make_unique(); filter_chain_ = &proof_source->filterChain(); crypto_config_peer.ResetProofSource(std::move(proof_source)); - simulated_time_system_.advanceTimeWait(std::chrono::milliseconds(100)); + simulated_time_system_.advanceTimeAsync(std::chrono::milliseconds(100)); + dispatcher_->run(Event::Dispatcher::RunType::NonBlock); } Network::ActiveUdpListenerFactoryPtr createQuicListenerFactory(const std::string& yaml) { diff --git a/test/extensions/quic_listeners/quiche/envoy_quic_dispatcher_test.cc b/test/extensions/quic_listeners/quiche/envoy_quic_dispatcher_test.cc index c5b6e6c2e7af0..9a9098ea9334e 100644 --- a/test/extensions/quic_listeners/quiche/envoy_quic_dispatcher_test.cc +++ b/test/extensions/quic_listeners/quiche/envoy_quic_dispatcher_test.cc @@ -93,7 +93,8 @@ class EnvoyQuicDispatcherTest : public QuicMultiVersionTest, void SetUp() override { // Advance time a bit because QuicTime regards 0 as uninitialized timestamp. - time_system_.advanceTimeWait(std::chrono::milliseconds(100)); + time_system_.advanceTimeAsync(std::chrono::milliseconds(100)); + dispatcher_->run(Event::Dispatcher::RunType::NonBlock); EXPECT_CALL(listener_config_, perConnectionBufferLimitBytes()) .WillRepeatedly(Return(1024 * 1024)); } diff --git a/test/extensions/quic_listeners/quiche/envoy_quic_server_session_test.cc b/test/extensions/quic_listeners/quiche/envoy_quic_server_session_test.cc index f2ef9fae069e0..d7a92ef6125b5 100644 --- a/test/extensions/quic_listeners/quiche/envoy_quic_server_session_test.cc +++ b/test/extensions/quic_listeners/quiche/envoy_quic_server_session_test.cc @@ -669,7 +669,8 @@ TEST_P(EnvoyQuicServerSessionTest, FlushAndWaitForCloseWithTimeout) { EXPECT_EQ(Network::Connection::State::Open, envoy_quic_session_.state()); // Unblocking the stream shouldn't close the connection as it should be // delayed. - time_system_.advanceTimeWait(std::chrono::milliseconds(10)); + time_system_.advanceTimeAsync(std::chrono::milliseconds(10)); + dispatcher_->run(Event::Dispatcher::RunType::NonBlock); envoy_quic_session_.OnCanWrite(); // delay close alarm should have been rescheduled. time_system_.advanceTimeAsync(std::chrono::milliseconds(90)); @@ -700,7 +701,9 @@ TEST_P(EnvoyQuicServerSessionTest, FlusWriteTransitToFlushWriteWithDelay) { envoy_quic_session_.close(Network::ConnectionCloseType::FlushWrite); EXPECT_EQ(Network::Connection::State::Open, envoy_quic_session_.state()); - time_system_.advanceTimeWait(std::chrono::milliseconds(10)); + time_system_.advanceTimeAsync(std::chrono::milliseconds(10)); + dispatcher_->run(Event::Dispatcher::RunType::NonBlock); + // The closing behavior should be changed. envoy_quic_session_.close(Network::ConnectionCloseType::FlushWriteAndDelay); // Unblocking the stream shouldn't close the connection as it should be @@ -732,7 +735,8 @@ TEST_P(EnvoyQuicServerSessionTest, FlushAndWaitForCloseWithNoPendingData) { // Advance the time a bit and try to close again. The delay close timer // shouldn't be rescheduled by this call. - time_system_.advanceTimeWait(std::chrono::milliseconds(10)); + time_system_.advanceTimeAsync(std::chrono::milliseconds(10)); + dispatcher_->run(Event::Dispatcher::RunType::NonBlock); envoy_quic_session_.close(Network::ConnectionCloseType::FlushWriteAndDelay); EXPECT_EQ(Network::Connection::State::Open, envoy_quic_session_.state()); diff --git a/test/test_common/simulated_time_system.cc b/test/test_common/simulated_time_system.cc index 4b898ce8ef969..8042bfafb55ae 100644 --- a/test/test_common/simulated_time_system.cc +++ b/test/test_common/simulated_time_system.cc @@ -47,15 +47,165 @@ class UnlockGuard { }; } // namespace +// Each timer is maintained and ordered by a common TimeSystem, but is +// associated with a scheduler. The scheduler creates the timers with a libevent +// context, so that the timer callbacks can be executed via Dispatcher::run() in +// the expected thread. +class SimulatedTimeSystemHelper::SimulatedScheduler : public Scheduler { +public: + SimulatedScheduler(SimulatedTimeSystemHelper& time_system, CallbackScheduler& cb_scheduler) + : time_system_(time_system), cb_scheduler_(cb_scheduler), + update_time_cb_( + cb_scheduler.createSchedulableCallback([this] { updateTimeAndTriggerAlarms(); })), + run_alarms_cb_(cb_scheduler.createSchedulableCallback([this] { runReadyAlarms(); })) { + time_system_.addScheduler(this); + } + ~SimulatedScheduler() override { time_system_.removeScheduler(this); } + + absl::optional minAlarmRegistrationTime(); + + TimerPtr createTimer(const TimerCb& cb, Dispatcher& /*dispatcher*/) override; + + bool isEnabled(Alarm& alarm); + void enableAlarm(Alarm& alarm, const std::chrono::microseconds& duration); + void disableAlarm(Alarm& alarm) { + absl::MutexLock lock(&mutex_); + disableAlarmLockHeld(alarm); + } + + void waitUntilIdle() { + absl::MutexLock lock(&mutex_); + auto triggered_alarms_empty = + +[](AlarmSet* triggered_alarms) { return triggered_alarms->empty(); }; + mutex_.Await(absl::Condition(triggered_alarms_empty, &triggered_alarms_)); + } + + void updateTime(MonotonicTime monotonic_time, SystemTime system_time) { + bool inc_pending = false; + { + absl::MutexLock lock(&mutex_); + next_monotonic_time_ = monotonic_time; + next_system_time_ = system_time; + if (!pending_dec_ && (!registered_alarms_.empty() || !triggered_alarms_.empty())) { + // HACK: selectively increment only on dispatchers that have active alarms to allow the + // pending alarms mechanism to be used to detect when alarms had their times updated, while + // also avoiding waiting for decrements from dispatchers that are either not active or are + // blocked on epoll for a long time because of lack of other events that would wake them up. + // There is a known, but not understood issue with QUIC tests timing out when using this + // version of simulated timers, I'm guessing that the event loops in question do not have a + // real periodic timer that ensures that the max epoll wait ends up being relatively small + // and ensures that events scheduled from outside the worker thread have a chance to + // execute. It may be necessary to have the simulated scheduler add a real periodic timer on + // construction in order to ensure that the event loop remains responsive to external event + // activations. + inc_pending = true; + pending_dec_ = true; + } + } + if (inc_pending) { + time_system_.incPending(); + } + if (!update_time_cb_->enabled()) { + run_alarms_cb_->cancel(); + update_time_cb_->scheduleCallbackCurrentIteration(); + } + } + +private: + void disableAlarmLockHeld(Alarm& alarm) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_); + + void updateTimeAndTriggerAlarms(); + void runReadyAlarms(); + + struct AlarmRegistration { + AlarmRegistration(MonotonicTime time, uint64_t randomness, Alarm& alarm) + : time_(time), randomness_(randomness), alarm_(alarm) {} + + MonotonicTime time_; + // Random tie-breaker for alarms scheduled for the same monotonic time used to mimic + // non-deterministic execution of real alarms scheduled for the same wall time. + uint64_t randomness_; + Alarm& alarm_; + + friend bool operator<(const AlarmRegistration& lhs, const AlarmRegistration& rhs) { + if (lhs.time_ != rhs.time_) { + return lhs.time_ < rhs.time_; + } + if (lhs.randomness_ != rhs.randomness_) { + return lhs.randomness_ < rhs.randomness_; + } + // Out of paranoia, use pointer comparison on the alarms as a final tie-breaker but also + // ASSERT that this branch isn't hit in debug modes since in practice the randomness_ + // associated with two registrations should never be equal. + ASSERT(false, "Alarm registration randomness_ for two alarms should never be equal."); + return &lhs.alarm_ < &rhs.alarm_; + } + }; + + class AlarmSet { + public: + bool empty() { return alarms_.empty(); } + + const AlarmRegistration& next() { return *alarms_.begin(); } + + void add(AlarmRegistration registration) { + auto insert_result = alarms_.insert(registration); + ASSERT(insert_result.second); + alarm_registrations_map_.emplace(®istration.alarm_, insert_result.first); + + // Sanity check that the parallel data structures used for alarm registration have the same + // number of entries. + ASSERT(alarms_.size() == alarm_registrations_map_.size()); + } + + bool remove(Alarm& alarm) { + auto it = alarm_registrations_map_.find(&alarm); + if (it == alarm_registrations_map_.end()) { + return false; + } + alarms_.erase(it->second); + alarm_registrations_map_.erase(it); + return true; + } + + bool contains(Alarm& alarm) const { + return alarm_registrations_map_.find(&alarm) != alarm_registrations_map_.end(); + } + + private: + std::set alarms_; + absl::flat_hash_map::const_iterator> + alarm_registrations_map_; + }; + + absl::Mutex mutex_; + bool not_running_cbs_ ABSL_GUARDED_BY(mutex_) = true; + AlarmSet registered_alarms_ ABSL_GUARDED_BY(mutex_); + AlarmSet triggered_alarms_ ABSL_GUARDED_BY(mutex_); + + MonotonicTime current_monotonic_time_ ABSL_GUARDED_BY(mutex_); + SystemTime current_system_time_ ABSL_GUARDED_BY(mutex_); + + MonotonicTime next_monotonic_time_ ABSL_GUARDED_BY(mutex_); + SystemTime next_system_time_ ABSL_GUARDED_BY(mutex_); + + TestRandomGenerator random_source_ ABSL_GUARDED_BY(mutex_); + uint64_t legacy_next_idx_ ABSL_GUARDED_BY(mutex_) = 0; + bool pending_dec_ ABSL_GUARDED_BY(mutex_) = false; + + SimulatedTimeSystemHelper& time_system_; + CallbackScheduler& cb_scheduler_; + SchedulableCallbackPtr update_time_cb_; + SchedulableCallbackPtr run_alarms_cb_; +}; + // Our simulated alarm inherits from TimerImpl so that the same dispatching // mechanism used in RealTimeSystem timers is employed for simulated alarms. class SimulatedTimeSystemHelper::Alarm : public Timer { public: Alarm(SimulatedScheduler& simulated_scheduler, SimulatedTimeSystemHelper& time_system, - CallbackScheduler& cb_scheduler, TimerCb cb) - : cb_(cb_scheduler.createSchedulableCallback([this, cb] { runAlarm(cb); })), - simulated_scheduler_(simulated_scheduler), time_system_(time_system), armed_(false), - pending_(false) {} + CallbackScheduler& /*cb_scheduler*/, TimerCb cb) + : cb_(cb), simulated_scheduler_(simulated_scheduler), time_system_(time_system) {} ~Alarm() override; @@ -67,122 +217,138 @@ class SimulatedTimeSystemHelper::Alarm : public Timer { }; void enableHRTimer(const std::chrono::microseconds& duration, const ScopeTrackedObject* scope) override; - bool enabled() override { - absl::MutexLock lock(&time_system_.mutex_); - return armed_ || cb_->enabled(); - } - - void disableTimerLockHeld() ABSL_EXCLUSIVE_LOCKS_REQUIRED(time_system_.mutex_); - - /** - * Activates the timer so it will be run the next time the libevent loop is run, - * typically via Dispatcher::run(). - */ - void activateLockHeld() ABSL_EXCLUSIVE_LOCKS_REQUIRED(time_system_.mutex_) { - ASSERT(armed_); - armed_ = false; - if (pending_) { - return; - } - pending_ = true; - time_system_.incPendingLockHeld(); - - // We don't want to activate the alarm under lock, as it will make a - // libevent call, and libevent itself uses locks: - // https://github.com/libevent/libevent/blob/29cc8386a2f7911eaa9336692a2c5544d8b4734f/event.c#L1917 - // See class comment for UnlockGuard for details on saving - // time_system_.mutex_ prior to running libevent, which may delete this. - UnlockGuard unlocker(time_system_.mutex_); - cb_->scheduleCallbackCurrentIteration(); - } + bool enabled() override { return simulated_scheduler_.isEnabled(*this); } SimulatedTimeSystemHelper& timeSystem() { return time_system_; } -private: - void runAlarm(TimerCb cb) { - { - absl::MutexLock lock(&time_system_.mutex_); - pending_ = false; - } - // Capture time_system_ in a local in case the alarm gets deleted in the callback. - SimulatedTimeSystemHelper& time_system = time_system_; - cb(); - time_system.decPending(); - } + void runAlarm() { cb_(); } - SchedulableCallbackPtr cb_; +private: + TimerCb cb_; SimulatedScheduler& simulated_scheduler_; SimulatedTimeSystemHelper& time_system_; - bool armed_ ABSL_GUARDED_BY(time_system_.mutex_); - bool pending_ ABSL_GUARDED_BY(time_system_.mutex_); }; -// Each timer is maintained and ordered by a common TimeSystem, but is -// associated with a scheduler. The scheduler creates the timers with a libevent -// context, so that the timer callbacks can be executed via Dispatcher::run() in -// the expected thread. -class SimulatedTimeSystemHelper::SimulatedScheduler : public Scheduler { -public: - SimulatedScheduler(SimulatedTimeSystemHelper& time_system, CallbackScheduler& cb_scheduler) - : time_system_(time_system), cb_scheduler_(cb_scheduler), - schedule_ready_alarms_cb_(cb_scheduler.createSchedulableCallback( - [this] { time_system_.scheduleReadyAlarms(); })) {} - TimerPtr createTimer(const TimerCb& cb, Dispatcher& /*dispatcher*/) override { - return std::make_unique(*this, time_system_, cb_scheduler_, - cb); - }; - void scheduleReadyAlarms() { schedule_ready_alarms_cb_->scheduleCallbackNextIteration(); } - -private: - SimulatedTimeSystemHelper& time_system_; - CallbackScheduler& cb_scheduler_; - SchedulableCallbackPtr schedule_ready_alarms_cb_; -}; +absl::optional +SimulatedTimeSystemHelper::SimulatedScheduler::minAlarmRegistrationTime() { + absl::MutexLock lock(&mutex_); + if (!triggered_alarms_.empty()) { + if (!registered_alarms_.empty()) { + return std::min(triggered_alarms_.next().time_, registered_alarms_.next().time_); + } + return triggered_alarms_.next().time_; + } -SimulatedTimeSystemHelper::Alarm::Alarm::~Alarm() { - if (armed_) { - disableTimer(); + if (!registered_alarms_.empty()) { + return registered_alarms_.next().time_; } + + return absl::nullopt; } -void SimulatedTimeSystemHelper::Alarm::Alarm::disableTimer() { - cb_->cancel(); - absl::MutexLock lock(&time_system_.mutex_); - disableTimerLockHeld(); +TimerPtr SimulatedTimeSystemHelper::SimulatedScheduler::createTimer(const TimerCb& cb, + Dispatcher& /*dispatcher*/) { + return std::make_unique(*this, time_system_, cb_scheduler_, cb); +} + +bool SimulatedTimeSystemHelper::SimulatedScheduler::isEnabled(Alarm& alarm) { + absl::MutexLock lock(&mutex_); + return registered_alarms_.contains(alarm) || triggered_alarms_.contains(alarm); } -void SimulatedTimeSystemHelper::Alarm::Alarm::disableTimerLockHeld() { - if (armed_) { - time_system_.removeAlarmLockHeld(*this); - armed_ = false; +void SimulatedTimeSystemHelper::SimulatedScheduler::enableAlarm( + Alarm& alarm, const std::chrono::microseconds& duration) { + { + absl::MutexLock lock(&mutex_); + if (duration.count() == 0 && triggered_alarms_.contains(alarm)) { + return; + } else if (Runtime::runtimeFeatureEnabled( + "envoy.reloadable_features.activate_timers_next_event_loop")) { + disableAlarmLockHeld(alarm); + registered_alarms_.add({current_monotonic_time_ + duration, random_source_.random(), alarm}); + } else { + disableAlarmLockHeld(alarm); + AlarmSet& alarm_set = (duration.count() != 0) ? registered_alarms_ : triggered_alarms_; + alarm_set.add({current_monotonic_time_ + duration, ++legacy_next_idx_, alarm}); + } + } + + if (duration.count() == 0) { + if (Runtime::runtimeFeatureEnabled( + "envoy.reloadable_features.activate_timers_next_event_loop")) { + run_alarms_cb_->scheduleCallbackNextIteration(); + } else { + run_alarms_cb_->scheduleCallbackCurrentIteration(); + } } - if (pending_) { - pending_ = false; - time_system_.decPendingLockHeld(); +} + +void SimulatedTimeSystemHelper::SimulatedScheduler::disableAlarmLockHeld(Alarm& alarm) { + if (triggered_alarms_.contains(alarm)) { + ASSERT(!registered_alarms_.contains(alarm)); + triggered_alarms_.remove(alarm); + } else { + ASSERT(!triggered_alarms_.contains(alarm)); + registered_alarms_.remove(alarm); } } -void SimulatedTimeSystemHelper::Alarm::Alarm::enableHRTimer( - const std::chrono::microseconds& duration, const ScopeTrackedObject* /*scope*/) { - if (duration.count() != 0) { - disableTimer(); +void SimulatedTimeSystemHelper::SimulatedScheduler::updateTimeAndTriggerAlarms() { + bool dec_pending = false; + { + absl::MutexLock lock(&mutex_); + current_monotonic_time_ = next_monotonic_time_; + current_system_time_ = next_system_time_; + if (pending_dec_) { + dec_pending = true; + pending_dec_ = false; + } } - absl::MutexLock lock(&time_system_.mutex_); - if (pending_) { - // Calling enableTimer on a timer that is already pending is a no-op. Timer will still fire - // based on the original time it was scheduled. - return; - } else if (armed_) { - disableTimerLockHeld(); + run_alarms_cb_->cancel(); + runReadyAlarms(); + if (dec_pending) { + time_system_.decPending(); } +} - armed_ = true; - if (duration.count() == 0 && !Runtime::runtimeFeatureEnabled( - "envoy.reloadable_features.activate_timers_next_event_loop")) { - activateLockHeld(); - } else { - time_system_.addAlarmLockHeld(*this, duration, simulated_scheduler_); +void SimulatedTimeSystemHelper::SimulatedScheduler::runReadyAlarms() { + absl::MutexLock lock(&mutex_); + auto monotonic_time = current_monotonic_time_; + // TODO delay alarms scheduled this iteration until next iteration. + while (!registered_alarms_.empty()) { + const AlarmRegistration& alarm_registration = registered_alarms_.next(); + MonotonicTime alarm_time = alarm_registration.time_; + if (alarm_time > monotonic_time) { + break; + } + triggered_alarms_.add(alarm_registration); + registered_alarms_.remove(alarm_registration.alarm_); } + + ASSERT(not_running_cbs_); + not_running_cbs_ = false; + while (!triggered_alarms_.empty()) { + Alarm& alarm = triggered_alarms_.next().alarm_; + triggered_alarms_.remove(alarm); + UnlockGuard unlocker(mutex_); + alarm.runAlarm(); + } + ASSERT(!not_running_cbs_); + not_running_cbs_ = true; +} + +SimulatedTimeSystemHelper::Alarm::Alarm::~Alarm() { + simulated_scheduler_.disableAlarm(*this); + simulated_scheduler_.waitUntilIdle(); +} + +void SimulatedTimeSystemHelper::Alarm::Alarm::disableTimer() { + simulated_scheduler_.disableAlarm(*this); +} + +void SimulatedTimeSystemHelper::Alarm::Alarm::enableHRTimer( + const std::chrono::microseconds& duration, const ScopeTrackedObject* /*scope*/) { + simulated_scheduler_.enableAlarm(*this, duration); } // It would be very confusing if there were more than one simulated time system @@ -240,54 +406,6 @@ void SimulatedTimeSystemHelper::waitForNoPendingLockHeld() const &pending_alarms_)); } -void SimulatedTimeSystemHelper::alarmActivateLockHeld(Alarm& alarm) ABSL_NO_THREAD_SAFETY_ANALYSIS { - // We disable thread-safety analysis as the compiler can't detect that - // alarm_.timeSystem() == this, so we must be holding the right mutex. - ASSERT(&(alarm.timeSystem()) == this); - alarm.activateLockHeld(); -} - -void SimulatedTimeSystemHelper::addAlarmLockHeld( - Alarm& alarm, const std::chrono::microseconds& duration, - SimulatedScheduler& simulated_scheduler) ABSL_NO_THREAD_SAFETY_ANALYSIS { - ASSERT(&(alarm.timeSystem()) == this); - ASSERT(alarms_.size() == alarm_registrations_map_.size()); - ASSERT(alarm_registrations_map_.find(&alarm) == alarm_registrations_map_.end()); - - auto insert_result = alarms_.insert({monotonic_time_ + duration, random_source_.random(), alarm}); - ASSERT(insert_result.second); - alarm_registrations_map_.emplace(&alarm, insert_result.first); - if (duration.count() == 0) { - // Force the event loop to check for timers that are ready to execute since we just added an 0 - // delay alarm which is ready to execution in the next iteration of the event loop. - // TODO(antoniovicente) Refactor alarm tracking so it happens per scheduler and limit wakeup to - // a single event loop. - - // We don't want to activate the alarm under lock, as it will make a libevent call, and libevent - // itself uses locks: - // https://github.com/libevent/libevent/blob/29cc8386a2f7911eaa9336692a2c5544d8b4734f/event.c#L1917 - UnlockGuard unlocker(mutex_); - simulated_scheduler.scheduleReadyAlarms(); - } - - // Sanity check that the parallel data structures used for alarm registration have the same number - // of entries. - ASSERT(alarms_.size() == alarm_registrations_map_.size()); -} - -void SimulatedTimeSystemHelper::removeAlarmLockHeld(Alarm& alarm) { - ASSERT(alarms_.size() == alarm_registrations_map_.size()); - - auto it = alarm_registrations_map_.find(&alarm); - ASSERT(it != alarm_registrations_map_.end()); - alarms_.erase(it->second); - alarm_registrations_map_.erase(it); - - // Sanity check that the parallel data structures used for alarm registration have the same number - // of entries. - ASSERT(alarms_.size() == alarm_registrations_map_.size()); -} - SchedulerPtr SimulatedTimeSystemHelper::createScheduler(Scheduler& /*base_scheduler*/, CallbackScheduler& cb_scheduler) { return std::make_unique(*this, cb_scheduler); @@ -304,30 +422,11 @@ void SimulatedTimeSystemHelper::setMonotonicTimeLockHeld(const MonotonicTime& mo system_time_ += std::chrono::duration_cast(monotonic_time - monotonic_time_); monotonic_time_ = monotonic_time; - scheduleReadyAlarmsLockHeld(); - } -} - -void SimulatedTimeSystemHelper::scheduleReadyAlarms() { - absl::MutexLock lock(&mutex_); - scheduleReadyAlarmsLockHeld(); -} - -void SimulatedTimeSystemHelper::scheduleReadyAlarmsLockHeld() { - // Alarms is a std::set ordered by wakeup time, so pulling off begin() each - // iteration gives you wakeup order. Also note that alarms may be added - // or removed during the call to activate() so it would not be correct to - // range-iterate over the set. - while (!alarms_.empty()) { - const AlarmRegistration& alarm_registration = *alarms_.begin(); - MonotonicTime alarm_time = alarm_registration.time_; - if (alarm_time > monotonic_time_) { - break; + // TODO release lock, protect by Await function that protects updating time boolean? + for (SimulatedScheduler* scheduler : schedulers_) { + UnlockGuard unlocker(mutex_); + scheduler->updateTime(monotonic_time_, system_time_); } - - Alarm& alarm = alarm_registration.alarm_; - removeAlarmLockHeld(alarm); - alarmActivateLockHeld(alarm); } } diff --git a/test/test_common/simulated_time_system.h b/test/test_common/simulated_time_system.h index 70b0e6a6b41fa..12e2e489e2e5e 100644 --- a/test/test_common/simulated_time_system.h +++ b/test/test_common/simulated_time_system.h @@ -62,32 +62,16 @@ class SimulatedTimeSystemHelper : public TestTimeSystem { private: class SimulatedScheduler; class Alarm; - friend class Alarm; // Needed to reference mutex for thread annotations. - struct AlarmRegistration { - AlarmRegistration(MonotonicTime time, uint64_t randomness, Alarm& alarm) - : time_(time), randomness_(randomness), alarm_(alarm) {} - - MonotonicTime time_; - // Random tie-breaker for alarms scheduled for the same monotonic time used to mimic - // non-deterministic execution of real alarms scheduled for the same wall time. - uint64_t randomness_; - Alarm& alarm_; - - friend bool operator<(const AlarmRegistration& lhs, const AlarmRegistration& rhs) { - if (lhs.time_ != rhs.time_) { - return lhs.time_ < rhs.time_; - } - if (lhs.randomness_ != rhs.randomness_) { - return lhs.randomness_ < rhs.randomness_; - } - // Out of paranoia, use pointer comparison on the alarms as a final tie-breaker but also - // ASSERT that this branch isn't hit in debug modes since in practice the randomness_ - // associated with two registrations should never be equal. - ASSERT(false, "Alarm registration randomness_ for two alarms should never be equal."); - return &lhs.alarm_ < &rhs.alarm_; - } - }; - using AlarmSet = std::set; + + void addScheduler(SimulatedScheduler* scheduler) { + absl::MutexLock lock(&mutex_); + schedulers_.insert(scheduler); + } + + void removeScheduler(SimulatedScheduler* scheduler) { + absl::MutexLock lock(&mutex_); + schedulers_.erase(scheduler); + } /** * Sets the time forward monotonically. If the supplied argument moves @@ -100,37 +84,23 @@ class SimulatedTimeSystemHelper : public TestTimeSystem { void setMonotonicTimeLockHeld(const MonotonicTime& monotonic_time) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_); - /** - * Schedule expired alarms so they execute in their event loops. - */ - void scheduleReadyAlarms(); - void scheduleReadyAlarmsLockHeld() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_); - - void alarmActivateLockHeld(Alarm& alarm) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_); - - // Adds/removes an alarm. - void addAlarmLockHeld(Alarm&, const std::chrono::microseconds& duration, - SimulatedScheduler& simulated_scheduler) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_); - void removeAlarmLockHeld(Alarm&) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_); - // Keeps track of how many alarms have been activated but not yet called, // which helps waitFor() determine when to give up and declare a timeout. - void incPendingLockHeld() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_) { ++pending_alarms_; } + void incPending() { + absl::MutexLock lock(&mutex_); + ++pending_alarms_; + } void decPending() { absl::MutexLock lock(&mutex_); - decPendingLockHeld(); + --pending_alarms_; } - void decPendingLockHeld() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_) { --pending_alarms_; } void waitForNoPendingLockHeld() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_); RealTimeSource real_time_source_; // Used to initialize monotonic_time_ and system_time_; MonotonicTime monotonic_time_ ABSL_GUARDED_BY(mutex_); SystemTime system_time_ ABSL_GUARDED_BY(mutex_); TestRandomGenerator random_source_ ABSL_GUARDED_BY(mutex_); - AlarmSet alarms_ ABSL_GUARDED_BY(mutex_); - absl::flat_hash_map - alarm_registrations_map_ ABSL_GUARDED_BY(mutex_); + std::set schedulers_ ABSL_GUARDED_BY(mutex_); mutable absl::Mutex mutex_; uint32_t pending_alarms_ ABSL_GUARDED_BY(mutex_); }; diff --git a/test/test_common/simulated_time_system_test.cc b/test/test_common/simulated_time_system_test.cc index 4dd55c610fa42..b887594c5d7cd 100644 --- a/test/test_common/simulated_time_system_test.cc +++ b/test/test_common/simulated_time_system_test.cc @@ -124,9 +124,13 @@ TEST_P(SimulatedTimeSystemTest, TimerPartialOrdering) { timers_.clear(); } - // Execution order of timers 1 and 2 is non-deterministic because the two timers were scheduled - // for the same time. Verify that both orderings were observed. - EXPECT_THAT(outputs, testing::ElementsAre("p0123", "p0213")); + if (activateMode() == ActivateMode::DelayActivateTimers) { + // Execution order of timers 1 and 2 is non-deterministic because the two timers were scheduled + // for the same time. Verify that both orderings were observed. + EXPECT_THAT(outputs, testing::ElementsAre("p0123", "p0213")); + } else { + EXPECT_THAT(outputs, testing::ElementsAre("p0123")); + } } TEST_P(SimulatedTimeSystemTest, TimerPartialOrdering2) { @@ -150,9 +154,13 @@ TEST_P(SimulatedTimeSystemTest, TimerPartialOrdering2) { timers_.clear(); } - // Execution order of timers 1 and 2 is non-deterministic because the two timers were scheduled - // for the same time. Verify that both orderings were observed. - EXPECT_THAT(outputs, testing::ElementsAre("p0p123", "p0p213")); + if (activateMode() == ActivateMode::DelayActivateTimers) { + // Execution order of timers 1 and 2 is non-deterministic because the two timers were scheduled + // for the same time. Verify that both orderings were observed. + EXPECT_THAT(outputs, testing::ElementsAre("p0p123", "p0p213")); + } else { + EXPECT_THAT(outputs, testing::ElementsAre("p0p123")); + } } // Timers that are scheduled to execute and but are disabled first do not trigger. @@ -484,6 +492,22 @@ TEST_P(SimulatedTimeSystemTest, Enabled) { EXPECT_TRUE(timer->enabled()); } +TEST_P(SimulatedTimeSystemTest, DeleteTimerFromThread) { + TimerPtr timer = scheduler_->createTimer([]() {}, dispatcher_); + timer->enableTimer(std::chrono::milliseconds(0)); + auto thread = Thread::threadFactoryForTest().createThread([&timer]() { timer.reset(); }); + advanceMsAndLoop(1); + thread->join(); +} + +TEST_P(SimulatedTimeSystemTest, DeleteTimerFromThread2) { + TimerPtr timer = scheduler_->createTimer([]() {}, dispatcher_); + timer->enableTimer(std::chrono::milliseconds(1)); + auto thread = Thread::threadFactoryForTest().createThread([&timer]() { timer.reset(); }); + advanceMsAndLoop(1); + thread->join(); +} + } // namespace } // namespace Test } // namespace Event diff --git a/tools/spelling/spelling_dictionary.txt b/tools/spelling/spelling_dictionary.txt index 1caf69b4da80c..870e58379b5d9 100644 --- a/tools/spelling/spelling_dictionary.txt +++ b/tools/spelling/spelling_dictionary.txt @@ -367,6 +367,7 @@ absl accesslog accessor accessors +activations acks acls addr