diff --git a/source/common/stats/thread_local_store.cc b/source/common/stats/thread_local_store.cc index 35fabc8f63b02..556fcdf784db0 100644 --- a/source/common/stats/thread_local_store.cc +++ b/source/common/stats/thread_local_store.cc @@ -22,6 +22,8 @@ namespace Envoy { namespace Stats { +const char ThreadLocalStoreImpl::DeleteScopeSync[] = "delete-scope"; +const char ThreadLocalStoreImpl::IterateScopeSync[] = "iterate-scope"; const char ThreadLocalStoreImpl::MainDispatcherCleanupSync[] = "main-dispatcher-cleanup"; ThreadLocalStoreImpl::ThreadLocalStoreImpl(Allocator& alloc) @@ -48,10 +50,10 @@ ThreadLocalStoreImpl::~ThreadLocalStoreImpl() { } void ThreadLocalStoreImpl::setHistogramSettings(HistogramSettingsConstPtr&& histogram_settings) { - Thread::LockGuard lock(lock_); - for (ScopeImpl* scope : scopes_) { - ASSERT(scope->central_cache_->histograms_.empty()); - } + iterateScopes([](const ScopeImplSharedPtr& scope) -> bool { + ASSERT(scope->centralCacheLockHeld()->histograms_.empty()); + return true; + }); histogram_settings_ = std::move(histogram_settings); } @@ -67,21 +69,23 @@ void ThreadLocalStoreImpl::setStatsMatcher(StatsMatcherPtr&& stats_matcher) { // be no copies in TLS caches. Thread::LockGuard lock(lock_); const uint32_t first_histogram_index = deleted_histograms_.size(); - for (ScopeImpl* scope : scopes_) { - removeRejectedStats(scope->central_cache_->counters_, + iterateScopesLockHeld([this](const ScopeImplSharedPtr& scope) ABSL_EXCLUSIVE_LOCKS_REQUIRED( + lock_) -> bool { + const CentralCacheEntrySharedPtr& central_cache = scope->centralCacheLockHeld(); + removeRejectedStats(central_cache->counters_, [this](const CounterSharedPtr& counter) mutable { alloc_.markCounterForDeletion(counter); }); removeRejectedStats( - scope->central_cache_->gauges_, + central_cache->gauges_, [this](const GaugeSharedPtr& gauge) mutable { alloc_.markGaugeForDeletion(gauge); }); - removeRejectedStats(scope->central_cache_->histograms_, deleted_histograms_); + removeRejectedStats(central_cache->histograms_, deleted_histograms_); removeRejectedStats( - scope->central_cache_->text_readouts_, - [this](const TextReadoutSharedPtr& text_readout) mutable { + central_cache->text_readouts_, [this](const TextReadoutSharedPtr& text_readout) mutable { alloc_.markTextReadoutForDeletion(text_readout); }); - } + return true; + }); // Remove any newly rejected histograms from histogram_set_. { @@ -151,7 +155,7 @@ ScopeSharedPtr ThreadLocalStoreImpl::createScope(const std::string& name) { ScopeSharedPtr ThreadLocalStoreImpl::scopeFromStatName(StatName name) { auto new_scope = std::make_shared(*this, name); Thread::LockGuard lock(lock_); - scopes_.emplace(new_scope.get()); + scopes_[new_scope.get()] = std::weak_ptr(new_scope); return new_scope; } @@ -283,7 +287,7 @@ void ThreadLocalStoreImpl::releaseScopeCrossThread(ScopeImpl* scope) { // VirtualHosts. bool need_post = scopes_to_cleanup_.empty(); scopes_to_cleanup_.push_back(scope->scope_id_); - central_cache_entries_to_cleanup_.push_back(scope->central_cache_); + central_cache_entries_to_cleanup_.push_back(scope->centralCacheLockHeld()); lock.release(); if (need_post) { @@ -386,6 +390,11 @@ ThreadLocalStoreImpl::ScopeImpl::ScopeImpl(ThreadLocalStoreImpl& parent, StatNam central_cache_(new CentralCacheEntry(parent.alloc_.symbolTable())) {} ThreadLocalStoreImpl::ScopeImpl::~ScopeImpl() { + // Helps reproduce a previous race condition by pausing here in tests while we + // loop over scopes. 'this' will not have been removed from the scopes_ table + // yet, so we need to be careful. + parent_.sync_.syncPoint(DeleteScopeSync); + // Note that scope iteration is thread-safe due to the lock held in // releaseScopeCrossThread. For more details see the comment in // `ThreadLocalStoreImpl::iterHelper`, and the lock it takes prior to the loop. @@ -460,6 +469,44 @@ bool ThreadLocalStoreImpl::checkAndRememberRejection(StatName name, return false; } +CounterOptConstRef ThreadLocalStoreImpl::findCounter(StatName name) const { + CounterOptConstRef found_counter; + iterateScopes([&found_counter, name](const ScopeImplSharedPtr& scope) -> bool { + found_counter = + scope->findStatLockHeld(name, scope->centralCacheLockHeld()->counters_); + return !found_counter.has_value(); + }); + return found_counter; +} + +GaugeOptConstRef ThreadLocalStoreImpl::findGauge(StatName name) const { + GaugeOptConstRef found_gauge; + iterateScopes([&found_gauge, name](const ScopeImplSharedPtr& scope) -> bool { + found_gauge = scope->findStatLockHeld(name, scope->centralCacheLockHeld()->gauges_); + return !found_gauge.has_value(); + }); + return found_gauge; +} + +HistogramOptConstRef ThreadLocalStoreImpl::findHistogram(StatName name) const { + HistogramOptConstRef found_histogram; + iterateScopes([&found_histogram, name](const ScopeImplSharedPtr& scope) -> bool { + found_histogram = scope->findHistogramLockHeld(name); + return !found_histogram.has_value(); + }); + return found_histogram; +} + +TextReadoutOptConstRef ThreadLocalStoreImpl::findTextReadout(StatName name) const { + TextReadoutOptConstRef found_text_readout; + iterateScopes([&found_text_readout, name](const ScopeImplSharedPtr& scope) -> bool { + found_text_readout = + scope->findStatLockHeld(name, scope->centralCacheLockHeld()->text_readouts_); + return !found_text_readout.has_value(); + }); + return found_text_readout; +} + template StatType& ThreadLocalStoreImpl::ScopeImpl::safeMakeStat( StatName full_stat_name, StatName name_no_tags, @@ -512,20 +559,6 @@ StatType& ThreadLocalStoreImpl::ScopeImpl::safeMakeStat( return ret; } -template -using StatTypeOptConstRef = absl::optional>; - -template -StatTypeOptConstRef ThreadLocalStoreImpl::ScopeImpl::findStatLockHeld( - StatName name, StatNameHashMap>& central_cache_map) const { - auto iter = central_cache_map.find(name); - if (iter == central_cache_map.end()) { - return absl::nullopt; - } - - return std::cref(*iter->second); -} - Counter& ThreadLocalStoreImpl::ScopeImpl::counterFromStatNameWithTags( const StatName& name, StatNameTagVectorOptConstRef stat_name_tags) { if (parent_.rejectsAll()) { @@ -551,9 +584,10 @@ Counter& ThreadLocalStoreImpl::ScopeImpl::counterFromStatNameWithTags( tls_rejected_stats = &entry.rejected_stats_; } + const CentralCacheEntrySharedPtr& central_cache = centralCacheNoThreadAnalysis(); return safeMakeStat( - final_stat_name, joiner.tagExtractedName(), stat_name_tags, central_cache_->counters_, - fast_reject_result, central_cache_->rejected_stats_, + final_stat_name, joiner.tagExtractedName(), stat_name_tags, central_cache->counters_, + fast_reject_result, central_cache->rejected_stats_, [](Allocator& allocator, StatName name, StatName tag_extracted_name, const StatNameTagVector& tags) -> CounterSharedPtr { return allocator.makeCounter(name, tag_extracted_name, tags); @@ -602,9 +636,10 @@ Gauge& ThreadLocalStoreImpl::ScopeImpl::gaugeFromStatNameWithTags( tls_rejected_stats = &entry.rejected_stats_; } + const CentralCacheEntrySharedPtr& central_cache = centralCacheNoThreadAnalysis(); Gauge& gauge = safeMakeStat( - final_stat_name, joiner.tagExtractedName(), stat_name_tags, central_cache_->gauges_, - fast_reject_result, central_cache_->rejected_stats_, + final_stat_name, joiner.tagExtractedName(), stat_name_tags, central_cache->gauges_, + fast_reject_result, central_cache->rejected_stats_, [import_mode](Allocator& allocator, StatName name, StatName tag_extracted_name, const StatNameTagVector& tags) -> GaugeSharedPtr { return allocator.makeGauge(name, tag_extracted_name, tags, import_mode); @@ -616,6 +651,8 @@ Gauge& ThreadLocalStoreImpl::ScopeImpl::gaugeFromStatNameWithTags( Histogram& ThreadLocalStoreImpl::ScopeImpl::histogramFromStatNameWithTags( const StatName& name, StatNameTagVectorOptConstRef stat_name_tags, Histogram::Unit unit) { + // See safety analysis comment in counterFromStatNameWithTags above. + if (parent_.rejectsAll()) { return parent_.null_histogram_; } @@ -646,12 +683,13 @@ Histogram& ThreadLocalStoreImpl::ScopeImpl::histogramFromStatNameWithTags( } Thread::LockGuard lock(parent_.lock_); - auto iter = central_cache_->histograms_.find(final_stat_name); + const CentralCacheEntrySharedPtr& central_cache = centralCacheNoThreadAnalysis(); + auto iter = central_cache->histograms_.find(final_stat_name); ParentHistogramImplSharedPtr* central_ref = nullptr; - if (iter != central_cache_->histograms_.end()) { + if (iter != central_cache->histograms_.end()) { central_ref = &iter->second; } else if (parent_.checkAndRememberRejection(final_stat_name, fast_reject_result, - central_cache_->rejected_stats_, + central_cache->rejected_stats_, tls_rejected_stats)) { return parent_.null_histogram_; } else { @@ -676,7 +714,7 @@ Histogram& ThreadLocalStoreImpl::ScopeImpl::histogramFromStatNameWithTags( } } - central_ref = ¢ral_cache_->histograms_[stat->statName()]; + central_ref = ¢ral_cache->histograms_[stat->statName()]; *central_ref = stat; } @@ -711,9 +749,10 @@ TextReadout& ThreadLocalStoreImpl::ScopeImpl::textReadoutFromStatNameWithTags( tls_rejected_stats = &entry.rejected_stats_; } + const CentralCacheEntrySharedPtr& central_cache = centralCacheNoThreadAnalysis(); return safeMakeStat( - final_stat_name, joiner.tagExtractedName(), stat_name_tags, central_cache_->text_readouts_, - fast_reject_result, central_cache_->rejected_stats_, + final_stat_name, joiner.tagExtractedName(), stat_name_tags, central_cache->text_readouts_, + fast_reject_result, central_cache->rejected_stats_, [](Allocator& allocator, StatName name, StatName tag_extracted_name, const StatNameTagVector& tags) -> TextReadoutSharedPtr { return allocator.makeTextReadout(name, tag_extracted_name, tags); @@ -722,14 +761,22 @@ TextReadout& ThreadLocalStoreImpl::ScopeImpl::textReadoutFromStatNameWithTags( } CounterOptConstRef ThreadLocalStoreImpl::ScopeImpl::findCounter(StatName name) const { + Thread::LockGuard lock(parent_.lock_); return findStatLockHeld(name, central_cache_->counters_); } GaugeOptConstRef ThreadLocalStoreImpl::ScopeImpl::findGauge(StatName name) const { + Thread::LockGuard lock(parent_.lock_); return findStatLockHeld(name, central_cache_->gauges_); } HistogramOptConstRef ThreadLocalStoreImpl::ScopeImpl::findHistogram(StatName name) const { + Thread::LockGuard lock(parent_.lock_); + return findHistogramLockHeld(name); +} + +HistogramOptConstRef ThreadLocalStoreImpl::ScopeImpl::findHistogramLockHeld(StatName name) const + ABSL_EXCLUSIVE_LOCKS_REQUIRED(parent_.lock_) { auto iter = central_cache_->histograms_.find(name); if (iter == central_cache_->histograms_.end()) { return absl::nullopt; @@ -740,6 +787,7 @@ HistogramOptConstRef ThreadLocalStoreImpl::ScopeImpl::findHistogram(StatName nam } TextReadoutOptConstRef ThreadLocalStoreImpl::ScopeImpl::findTextReadout(StatName name) const { + Thread::LockGuard lock(parent_.lock_); return findStatLockHeld(name, central_cache_->text_readouts_); } @@ -972,15 +1020,39 @@ void ThreadLocalStoreImpl::forEachHistogram(SizeFn f_size, StatFn f_size, StatFn f_scope) const { - Thread::LockGuard lock(lock_); + std::vector scopes; + iterateScopes([&scopes](const ScopeImplSharedPtr& scope) -> bool { + scopes.push_back(scope); + return true; + }); + if (f_size != nullptr) { - f_size(scopes_.size()); + f_size(scopes.size()); } - for (ScopeImpl* scope : scopes_) { + for (const ScopeSharedPtr& scope : scopes) { f_scope(*scope); } } +bool ThreadLocalStoreImpl::iterateScopesLockHeld( + const std::function fn) const + ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { + for (auto& iter : scopes_) { + sync_.syncPoint(ThreadLocalStoreImpl::IterateScopeSync); + + // We keep the scopes as a map from Scope* to weak_ptr so that if, + // during the iteration, the last reference to a ScopeSharedPtr is dropped, + // we can test for that here by attempting to lock the weak pointer, and + // skip those that are nullptr. + const std::weak_ptr& scope = iter.second; + const ScopeImplSharedPtr& locked = scope.lock(); + if (locked != nullptr && !fn(locked)) { + return false; + } + } + return true; +} + void ThreadLocalStoreImpl::forEachSinkedCounter(SizeFn f_size, StatFn f_stat) const { alloc_.forEachSinkedCounter(f_size, f_stat); } diff --git a/source/common/stats/thread_local_store.h b/source/common/stats/thread_local_store.h index 74f625eaa2b2f..6d9ad46dab1ed 100644 --- a/source/common/stats/thread_local_store.h +++ b/source/common/stats/thread_local_store.h @@ -21,7 +21,6 @@ #include "source/common/stats/utility.h" #include "absl/container/flat_hash_map.h" -#include "absl/container/flat_hash_set.h" #include "circllhist.h" namespace Envoy { @@ -145,6 +144,8 @@ using ParentHistogramImplSharedPtr = RefcountPtr; */ class ThreadLocalStoreImpl : Logger::Loggable, public StoreRoot { public: + static const char DeleteScopeSync[]; + static const char IterateScopeSync[]; static const char MainDispatcherCleanupSync[]; ThreadLocalStoreImpl(Allocator& alloc); @@ -188,51 +189,10 @@ class ThreadLocalStoreImpl : Logger::Loggable, public StoreRo const SymbolTable& constSymbolTable() const override { return alloc_.constSymbolTable(); } SymbolTable& symbolTable() override { return alloc_.symbolTable(); } const TagProducer& tagProducer() const { return *tag_producer_; } - CounterOptConstRef findCounter(StatName name) const override { - CounterOptConstRef found_counter; - Thread::LockGuard lock(lock_); - for (ScopeImpl* scope : scopes_) { - found_counter = scope->findCounter(name); - if (found_counter.has_value()) { - return found_counter; - } - } - return absl::nullopt; - } - GaugeOptConstRef findGauge(StatName name) const override { - GaugeOptConstRef found_gauge; - Thread::LockGuard lock(lock_); - for (ScopeImpl* scope : scopes_) { - found_gauge = scope->findGauge(name); - if (found_gauge.has_value()) { - return found_gauge; - } - } - return absl::nullopt; - } - HistogramOptConstRef findHistogram(StatName name) const override { - HistogramOptConstRef found_histogram; - Thread::LockGuard lock(lock_); - for (ScopeImpl* scope : scopes_) { - found_histogram = scope->findHistogram(name); - if (found_histogram.has_value()) { - return found_histogram; - } - } - return absl::nullopt; - } - TextReadoutOptConstRef findTextReadout(StatName name) const override { - TextReadoutOptConstRef found_text_readout; - Thread::LockGuard lock(lock_); - for (ScopeImpl* scope : scopes_) { - found_text_readout = scope->findTextReadout(name); - if (found_text_readout.has_value()) { - return found_text_readout; - } - } - return absl::nullopt; - } - + CounterOptConstRef findCounter(StatName name) const override; + GaugeOptConstRef findGauge(StatName name) const override; + HistogramOptConstRef findHistogram(StatName name) const override; + TextReadoutOptConstRef findTextReadout(StatName name) const override; bool iterate(const IterateFn& fn) const override { return iterHelper(fn); } bool iterate(const IterateFn& fn) const override { return iterHelper(fn); } bool iterate(const IterateFn& fn) const override { return iterHelper(fn); } @@ -388,16 +348,33 @@ class ThreadLocalStoreImpl : Logger::Loggable, public StoreRo } bool iterate(const IterateFn& fn) const override { - return iterHelper(fn, central_cache_->counters_); + Thread::LockGuard lock(parent_.lock_); + return iterateLockHeld(fn); } bool iterate(const IterateFn& fn) const override { - return iterHelper(fn, central_cache_->gauges_); + Thread::LockGuard lock(parent_.lock_); + return iterateLockHeld(fn); } bool iterate(const IterateFn& fn) const override { - return iterHelper(fn, central_cache_->histograms_); + Thread::LockGuard lock(parent_.lock_); + return iterateLockHeld(fn); } bool iterate(const IterateFn& fn) const override { - return iterHelper(fn, central_cache_->text_readouts_); + Thread::LockGuard lock(parent_.lock_); + return iterateLockHeld(fn); + } + + bool iterateLockHeld(const IterateFn& fn) const { + return iterHelper(fn, centralCacheLockHeld()->counters_); + } + bool iterateLockHeld(const IterateFn& fn) const { + return iterHelper(fn, centralCacheLockHeld()->gauges_); + } + bool iterateLockHeld(const IterateFn& fn) const { + return iterHelper(fn, centralCacheLockHeld()->histograms_); + } + bool iterateLockHeld(const IterateFn& fn) const { + return iterHelper(fn, centralCacheLockHeld()->text_readouts_); } // NOTE: The find methods assume that `name` is fully-qualified. @@ -407,6 +384,8 @@ class ThreadLocalStoreImpl : Logger::Loggable, public StoreRo HistogramOptConstRef findHistogram(StatName name) const override; TextReadoutOptConstRef findTextReadout(StatName name) const override; + HistogramOptConstRef findHistogramLockHeld(StatName name) const; + template using MakeStatFn = std::function( Allocator&, StatName name, StatName tag_extracted_name, const StatNameTagVector& tags)>; @@ -448,14 +427,43 @@ class ThreadLocalStoreImpl : Logger::Loggable, public StoreRo template StatTypeOptConstRef findStatLockHeld(StatName name, - StatNameHashMap>& central_cache_map) const; + StatNameHashMap>& central_cache_map) const { + auto iter = central_cache_map.find(name); + if (iter == central_cache_map.end()) { + return absl::nullopt; + } + + return std::cref(*iter->second); + } StatName prefix() const override { return prefix_.statName(); } + // Returns the central cache, asserting that the parent lock is held. + // + // When a ThreadLocalStore method takes lock_ and then accesses + // scope->central_cache_, the analysis system cannot understand that the + // scope's parent_.lock_ is held, so we assert that here. + const CentralCacheEntrySharedPtr& centralCacheLockHeld() const + ABSL_ASSERT_EXCLUSIVE_LOCK(parent_.lock_) { + return central_cache_; + } + + // Returns the central cache, bypassing thread analysis. + // + // This is used only when passing references to maps held in the central + // cache to safeMakeStat, which takes the lock only if those maps are + // actually referenced, due to the lookup missing the TLS cache. + const CentralCacheEntrySharedPtr& + centralCacheNoThreadAnalysis() const ABSL_NO_THREAD_SAFETY_ANALYSIS { + return central_cache_; + } + const uint64_t scope_id_; ThreadLocalStoreImpl& parent_; + + private: StatNameStorage prefix_; - mutable CentralCacheEntrySharedPtr central_cache_; + mutable CentralCacheEntrySharedPtr central_cache_ ABSL_GUARDED_BY(parent_.lock_); }; struct TlsCache : public ThreadLocal::ThreadLocalObject { @@ -476,20 +484,28 @@ class ThreadLocalStoreImpl : Logger::Loggable, public StoreRo absl::flat_hash_map tls_histogram_cache_; }; - template bool iterHelper(StatFn fn) const { - // Note that any thread can delete a scope at any time, and so another - // thread may have initiated destruction when we enter `iterHelper`. - // However the first thing that happens is releaseScopeCrossThread, which - // takes lock_, and doesn't release it until scopes_.erase(scope) finishes. - // thus there is no race risk with iterating over scopes while another - // thread deletes them. + using ScopeImplSharedPtr = std::shared_ptr; + + /** + * Calls fn_lock_held for every scope with, lock_ held. This avoids iterate/destruct + * races for scopes. + * + * @param fn_lock_held function to be called, with lock_ held, on every scope, until + * fn_lock_held() returns false. + * @return true if the iteration completed with fn_lock_held never returning false. + */ + bool iterateScopes(const std::function fn_lock_held) const { Thread::LockGuard lock(lock_); - for (ScopeImpl* scope : scopes_) { - if (!scope->iterate(fn)) { - return false; - } - } - return true; + return iterateScopesLockHeld(fn_lock_held); + } + + bool iterateScopesLockHeld(const std::function fn) const + ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_); + + // The Store versions of iterate cover all the scopes in the store. + template bool iterHelper(StatFn fn) const { + return iterateScopes( + [fn](const ScopeImplSharedPtr& scope) -> bool { return scope->iterateLockHeld(fn); }); } StatName prefix() const override { return StatName(); } @@ -519,7 +535,7 @@ class ThreadLocalStoreImpl : Logger::Loggable, public StoreRo using TlsCacheSlot = ThreadLocal::TypedSlotPtr; ThreadLocal::TypedSlotPtr tls_cache_; mutable Thread::MutexBasicLockable lock_; - absl::flat_hash_set scopes_ ABSL_GUARDED_BY(lock_); + absl::flat_hash_map> scopes_ ABSL_GUARDED_BY(lock_); ScopeSharedPtr default_scope_; std::list> timer_sinks_; TagProducerPtr tag_producer_; @@ -535,7 +551,7 @@ class ThreadLocalStoreImpl : Logger::Loggable, public StoreRo NullHistogramImpl null_histogram_; NullTextReadoutImpl null_text_readout_; - Thread::ThreadSynchronizer sync_; + mutable Thread::ThreadSynchronizer sync_; std::atomic next_scope_id_{}; uint64_t next_histogram_id_ ABSL_GUARDED_BY(hist_mutex_) = 0; diff --git a/test/common/stats/thread_local_store_test.cc b/test/common/stats/thread_local_store_test.cc index 62ee10b50264d..54a4f65c13888 100644 --- a/test/common/stats/thread_local_store_test.cc +++ b/test/common/stats/thread_local_store_test.cc @@ -1700,7 +1700,7 @@ class ThreadLocalRealThreadsTestBase : public Thread::RealThreadsTestHelper, // TODO(chaoqin-li1123): clean this up when we figure out how to free the threading resources in // RealThreadsTestHelper. shutdownThreading(); - exitThreads(); + exitThreads([this]() { store_.reset(); }); } void shutdownThreading() { @@ -1713,25 +1713,52 @@ class ThreadLocalRealThreadsTestBase : public Thread::RealThreadsTestHelper, }); } - void exitThreads() { - for (Event::DispatcherPtr& dispatcher : thread_dispatchers_) { - dispatcher->post([&dispatcher]() { dispatcher->exit(); }); - } + StatNamePool pool_; +}; - for (Thread::ThreadPtr& thread : threads_) { - thread->join(); - } +class OneWorkerThread : public ThreadLocalRealThreadsTestBase { +protected: + static constexpr uint32_t NumThreads = 1; + OneWorkerThread() : ThreadLocalRealThreadsTestBase(NumThreads) {} +}; - main_dispatcher_->post([this]() { - store_.reset(); - tls_.reset(); - main_dispatcher_->exit(); - }); - main_thread_->join(); - } +// Reproduces a race-condition between forEachScope and scope deletion. If we +// replace the code in ThreadLocalStoreImpl::forEachScope with this: +// +// SPELLCHECKER(off) +// Thread::LockGuard lock(lock_); +// if (f_size != nullptr) { +// f_size(scopes_.size()); +// } +// for (auto iter : scopes_) { +// if (iter.first != default_scope_.get()) { +// sync_.syncPoint(ThreadLocalStoreImpl::IterateScopeSync); +// } +// f_scope(*(iter.first)); +// } +// SPELLCHECKER(on) +// +// then we'll get a fatal exception on a weak_ptr conversion with this test. +TEST_F(OneWorkerThread, DeleteForEachRace) { + ScopeSharedPtr scope = store_->createScope("scope."); + std::vector scopes; - StatNamePool pool_; -}; + store_->sync().enable(); + store_->sync().waitOn(ThreadLocalStoreImpl::DeleteScopeSync); + store_->sync().waitOn(ThreadLocalStoreImpl::IterateScopeSync); + auto wait_for_worker = runOnAllWorkers([this, &scopes]() { + store_->forEachScope( + nullptr, [&scopes](const Scope& scope) { scopes.push_back(scope.getConstShared()); }); + EXPECT_EQ(1, scopes.size()); + }); + store_->sync().barrierOn(ThreadLocalStoreImpl::IterateScopeSync); + auto wait_for_main = runOnMain([&scope]() { scope.reset(); }); + store_->sync().barrierOn(ThreadLocalStoreImpl::DeleteScopeSync); + store_->sync().signal(ThreadLocalStoreImpl::IterateScopeSync); + wait_for_worker(); + store_->sync().signal(ThreadLocalStoreImpl::DeleteScopeSync); + wait_for_main(); +} class ClusterShutdownCleanupStarvationTest : public ThreadLocalRealThreadsTestBase { protected: @@ -1751,7 +1778,6 @@ class ClusterShutdownCleanupStarvationTest : public ThreadLocalRealThreadsTestBa } void createScopesIncCountersAndCleanupAllThreads() { - runOnAllWorkersBlocking([this]() { createScopesIncCountersAndCleanup(); }); } diff --git a/test/server/admin/BUILD b/test/server/admin/BUILD index be0e9e799ac69..63889ecd34d30 100644 --- a/test/server/admin/BUILD +++ b/test/server/admin/BUILD @@ -62,10 +62,12 @@ envoy_cc_test( deps = [ ":admin_instance_lib", "//source/common/stats:thread_local_store_lib", + "//source/common/thread_local:thread_local_lib", "//source/server/admin:stats_handler_lib", "//source/server/admin:utils_lib", "//test/mocks/server:admin_stream_mocks", "//test/test_common:logging_lib", + "//test/test_common:real_threads_test_helper_lib", "//test/test_common:utility_lib", ], ) diff --git a/test/server/admin/stats_handler_test.cc b/test/server/admin/stats_handler_test.cc index df88b2e99c2b8..068b19b2c1691 100644 --- a/test/server/admin/stats_handler_test.cc +++ b/test/server/admin/stats_handler_test.cc @@ -4,11 +4,13 @@ #include "source/common/stats/custom_stat_namespaces_impl.h" #include "source/common/stats/thread_local_store.h" #include "source/server/admin/stats_handler.h" +#include "source/server/admin/stats_request.h" #include "test/mocks/server/admin_stream.h" #include "test/mocks/server/instance.h" #include "test/server/admin/admin_instance.h" #include "test/test_common/logging.h" +#include "test/test_common/real_threads_test_helper.h" #include "test/test_common/utility.h" using testing::EndsWith; @@ -1069,6 +1071,115 @@ TEST_P(AdminStatsTest, SortedHistograms) { } } +// Sets up a test using real threads to reproduce a race between deleting scopes +// and iterating over them. +class ThreadedTest : public testing::Test { +protected: + // These values were picked by trial and error, with a log added to + // ThreadLocalStoreImpl::iterateScopesLockHeld for when locked==nullptr. The + // goal with these numbers was to maximize the number of times we'd have to + // skip over a deleted scope that fails the lock, while keeping the test + // duration under a few seconds. On one dev box, these settings allow for + // about 20 reproductions of the race in a 5 second test. + static constexpr uint32_t NumThreads = 12; + static constexpr uint32_t NumScopes = 5; + static constexpr uint32_t NumStatsPerScope = 5; + static constexpr uint32_t NumIters = 20; + + ThreadedTest() + : real_threads_(NumThreads), pool_(symbol_table_), alloc_(symbol_table_), + store_(std::make_unique(alloc_)) { + for (uint32_t i = 0; i < NumStatsPerScope; ++i) { + counter_names_[i] = pool_.add(absl::StrCat("c", "_", i)); + } + for (uint32_t i = 0; i < NumScopes; ++i) { + scope_names_[i] = pool_.add(absl::StrCat("scope", "_", i)); + } + real_threads_.runOnMainBlocking([this]() { + store_->initializeThreading(real_threads_.mainDispatcher(), real_threads_.tls()); + }); + } + + ~ThreadedTest() override { + real_threads_.runOnMainBlocking([this]() { + ThreadLocal::Instance& tls = real_threads_.tls(); + if (!tls.isShutdown()) { + tls.shutdownGlobalThreading(); + } + store_->shutdownThreading(); + tls.shutdownThread(); + }); + real_threads_.exitThreads([this]() { + scopes_.clear(); + store_.reset(); + }); + } + + // Builds, or re-builds, NumScopes scopes, each of which has NumStatsPerScopes + // counters. The scopes are kept in an already-sized vector. We keep a + // fine-grained mutex for each scope just for each entry in the scope vector + // so we can have multiple threads concurrently rebuilding the scopes. + void addStats() { + ASSERT_EQ(NumScopes, scopes_.size()); + for (uint32_t s = 0; s < NumScopes; ++s) { + Stats::ScopeSharedPtr scope = store_->scopeFromStatName(scope_names_[s]); + { + absl::MutexLock lock(&scope_mutexes_[s]); + scopes_[s] = scope; + } + for (Stats::StatName counter_name : counter_names_) { + scope->counterFromStatName(counter_name); + } + } + } + + void statsEndpoint() { + StatsRequest request(*store_, false, false, Utility::HistogramBucketsMode::NoBuckets, + absl::nullopt); + Http::TestResponseHeaderMapImpl response_headers; + request.start(response_headers); + Buffer::OwnedImpl data; + while (request.nextChunk(data)) { + } + for (const Buffer::RawSlice& slice : data.getRawSlices()) { + absl::string_view str(static_cast(slice.mem_), slice.len_); + // Sanity check that the /stats endpoint is doing something by counting + // newlines. + total_lines_ += std::count_if(str.begin(), str.end(), [](char c) { return c == '\n'; }); + } + } + + Thread::RealThreadsTestHelper real_threads_; + Stats::SymbolTableImpl symbol_table_; + Stats::StatNamePool pool_; + Stats::AllocatorImpl alloc_; + std::unique_ptr store_; + std::vector scopes_{NumScopes}; + absl::Mutex scope_mutexes_[NumScopes]; + std::atomic total_lines_{0}; + Stats::StatName counter_names_[NumStatsPerScope]; + Stats::StatName scope_names_[NumScopes]; +}; + +TEST_F(ThreadedTest, Threaded) { + real_threads_.runOnAllWorkersBlocking([this]() { + for (uint32_t i = 0; i < NumIters; ++i) { + addStats(); + statsEndpoint(); + } + }); + + // We expect all the constants, multiplied together, give us the expected + // number of lines. However, whenever there is an attempt to iterate over + // scopes in one thread while another thread has replaced that scope, we will + // drop some scopes and/or stats in an in-construction scope. So we just test + // here that the number of lines is between 0.5x and 1.x expected. If this + // proves flaky we can loosen this check. + uint32_t expected = NumThreads * NumScopes * NumStatsPerScope * NumIters; + EXPECT_GE(expected, total_lines_); + EXPECT_LE(expected / 2, total_lines_); +} + INSTANTIATE_TEST_SUITE_P(IpVersions, AdminInstanceTest, testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), TestUtility::ipTestParamsToString); diff --git a/test/test_common/real_threads_test_helper.cc b/test/test_common/real_threads_test_helper.cc index 3408b0c173b6e..d484a8a3eabf9 100644 --- a/test/test_common/real_threads_test_helper.cc +++ b/test/test_common/real_threads_test_helper.cc @@ -50,7 +50,7 @@ void RealThreadsTestHelper::shutdownThreading() { }); } -void RealThreadsTestHelper::exitThreads() { +void RealThreadsTestHelper::exitThreads(std::function cleanup) { for (Event::DispatcherPtr& dispatcher : thread_dispatchers_) { dispatcher->post([&dispatcher]() { dispatcher->exit(); }); } @@ -59,7 +59,10 @@ void RealThreadsTestHelper::exitThreads() { thread->join(); } - main_dispatcher_->post([this]() { + main_dispatcher_->post([this, cleanup]() { + if (cleanup != nullptr) { + cleanup(); + } tls_.reset(); main_dispatcher_->exit(); }); @@ -67,19 +70,33 @@ void RealThreadsTestHelper::exitThreads() { } void RealThreadsTestHelper::runOnAllWorkersBlocking(std::function work) { - absl::Barrier start_barrier(num_threads_); - BlockingBarrier blocking_barrier(num_threads_); + runOnAllWorkers(work)(); +} + +std::function RealThreadsTestHelper::runOnAllWorkers(std::function work) { + auto start_barrier = std::make_shared(num_threads_); + auto blocking_barrier = std::make_shared(num_threads_); for (Event::DispatcherPtr& thread_dispatcher : thread_dispatchers_) { - thread_dispatcher->post(blocking_barrier.run([work, &start_barrier]() { - start_barrier.Block(); + thread_dispatcher->post(blocking_barrier->run([work, start_barrier]() { + start_barrier->Block(); work(); })); } + + // When run, this closure will block on the destruction of the blocking barrier. + auto waiter = [blocking_barrier]() {}; + blocking_barrier.reset(); + return waiter; } -void RealThreadsTestHelper::runOnMainBlocking(std::function work) { - BlockingBarrier blocking_barrier(1); - main_dispatcher_->post(blocking_barrier.run([work]() { work(); })); +void RealThreadsTestHelper::runOnMainBlocking(std::function work) { runOnMain(work)(); } + +std::function RealThreadsTestHelper::runOnMain(std::function work) { + auto blocking_barrier = std::make_shared(1); + main_dispatcher_->post(blocking_barrier->run([work]() { work(); })); + auto waiter = [blocking_barrier]() {}; + blocking_barrier.reset(); + return waiter; } void RealThreadsTestHelper::mainDispatchBlock() { diff --git a/test/test_common/real_threads_test_helper.h b/test/test_common/real_threads_test_helper.h index 12444777efbd4..4d591b1dc639b 100644 --- a/test/test_common/real_threads_test_helper.h +++ b/test/test_common/real_threads_test_helper.h @@ -7,7 +7,7 @@ namespace Envoy { namespace Thread { class RealThreadsTestHelper { -protected: +public: // Helper class to block on a number of multi-threaded operations occurring. class BlockingBarrier { public: @@ -40,7 +40,7 @@ class RealThreadsTestHelper { // Shutdown thread local instance. void shutdownThreading(); // Post exit signal and wait for main thread and worker threads to join. - void exitThreads(); + void exitThreads(std::function cleanup = nullptr); // Run the callback in all the workers, block until the callback has finished in all threads. void runOnAllWorkersBlocking(std::function work); // Run the callback in main thread, block until the callback has been executed in main thread. @@ -52,12 +52,21 @@ class RealThreadsTestHelper { // executed. void tlsBlock(); - ThreadLocal::Instance& tls() { return *tls_; } + // Runs a function on all workers, and returns a lambda which blocks waiting + // for all the workers to finish. + std::function runOnAllWorkers(std::function work); + + // Runs a function on the main thread, and returns a lambda which blocks + // waiting for the main thread to finish. + std::function runOnMain(std::function work); + ThreadLocal::Instance& tls() { return *tls_; } Api::Api& api() { return *api_; } + Event::Dispatcher& mainDispatcher() { return *main_dispatcher_; } // TODO(chaoqin-li1123): make these variables private when we figure out how to clean up the // threading resources inside the helper class. +protected: Api::ApiPtr api_; Event::DispatcherPtr main_dispatcher_; std::vector thread_dispatchers_;