diff --git a/source/common/stats/allocator_impl.h b/source/common/stats/allocator_impl.h index 3242d0de5fefc..469484866f18a 100644 --- a/source/common/stats/allocator_impl.h +++ b/source/common/stats/allocator_impl.h @@ -58,29 +58,10 @@ class AllocatorImpl : public Allocator { friend class TextReadoutImpl; friend class NotifyingAllocatorImpl; - struct HeapStatHash { - using is_transparent = void; // NOLINT(readability-identifier-naming) - size_t operator()(const Metric* a) const { return a->statName().hash(); } - size_t operator()(StatName a) const { return a.hash(); } - }; - - struct HeapStatCompare { - using is_transparent = void; // NOLINT(readability-identifier-naming) - bool operator()(const Metric* a, const Metric* b) const { - return a->statName() == b->statName(); - } - bool operator()(const Metric* a, StatName b) const { return a->statName() == b; } - }; - void removeCounterFromSetLockHeld(Counter* counter) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_); void removeGaugeFromSetLockHeld(Gauge* gauge) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_); void removeTextReadoutFromSetLockHeld(Counter* counter) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_); - // An unordered set of HeapStatData pointers which keys off the key() - // field in each object. This necessitates a custom comparator and hasher, which key off of the - // StatNamePtr's own StatNamePtrHash and StatNamePtrCompare operators. - template - using StatSet = absl::flat_hash_set; StatSet counters_ ABSL_GUARDED_BY(mutex_); StatSet gauges_ ABSL_GUARDED_BY(mutex_); StatSet text_readouts_ ABSL_GUARDED_BY(mutex_); diff --git a/source/common/stats/metric_impl.h b/source/common/stats/metric_impl.h index c923395b992d4..52b577230fd3b 100644 --- a/source/common/stats/metric_impl.h +++ b/source/common/stats/metric_impl.h @@ -32,10 +32,35 @@ class MetricHelper { void iterateTagStatNames(const Metric::TagStatNameIterFn& fn) const; void clear(SymbolTable& symbol_table) { stat_names_.clear(symbol_table); } + // Hasher for metrics. + struct Hash { + using is_transparent = void; // NOLINT(readability-identifier-naming) + size_t operator()(const Metric* a) const { return a->statName().hash(); } + size_t operator()(StatName a) const { return a.hash(); } + }; + + // Comparator for metrics. + struct Compare { + using is_transparent = void; // NOLINT(readability-identifier-naming) + bool operator()(const Metric* a, const Metric* b) const { + return a->statName() == b->statName(); + } + bool operator()(const Metric* a, StatName b) const { return a->statName() == b; } + }; + private: StatNameList stat_names_; }; +// An unordered set of stat pointers. which keys off Metric::statName(). +// This necessitates a custom comparator and hasher, using the StatNamePtr's +// own StatNamePtrHash and StatNamePtrCompare operators. +// +// This is used by AllocatorImpl for counters, gauges, and text-readouts, and +// is also used by thread_local_store.h for histograms. +template +using StatSet = absl::flat_hash_set; + /** * Partial implementation of the Metric interface on behalf of Counters, Gauges, * and Histograms. It leaves symbolTable() unimplemented so that implementations diff --git a/source/common/stats/thread_local_store.cc b/source/common/stats/thread_local_store.cc index 697760ed1a4b8..54d0c78eba9bd 100644 --- a/source/common/stats/thread_local_store.cc +++ b/source/common/stats/thread_local_store.cc @@ -63,12 +63,22 @@ void ThreadLocalStoreImpl::setStatsMatcher(StatsMatcherPtr&& stats_matcher) { // in the default_scope. There should be no requests, so there will // be no copies in TLS caches. Thread::LockGuard lock(lock_); + const uint32_t first_histogram_index = deleted_histograms_.size(); for (ScopeImpl* scope : scopes_) { removeRejectedStats(scope->central_cache_->counters_, deleted_counters_); removeRejectedStats(scope->central_cache_->gauges_, deleted_gauges_); removeRejectedStats(scope->central_cache_->histograms_, deleted_histograms_); removeRejectedStats(scope->central_cache_->text_readouts_, deleted_text_readouts_); } + + // Remove any newly rejected histograms from histogram_set_. + { + Thread::LockGuard hist_lock(hist_mutex_); + for (uint32_t i = first_histogram_index; i < deleted_histograms_.size(); ++i) { + uint32_t erased = histogram_set_.erase(deleted_histograms_[i].get()); + ASSERT(erased == 1); + } + } } template @@ -160,16 +170,11 @@ std::vector ThreadLocalStoreImpl::textReadouts() const { std::vector ThreadLocalStoreImpl::histograms() const { std::vector ret; - Thread::LockGuard lock(lock_); - // TODO(ramaraochavali): As histograms don't share storage, there is a chance of duplicate names - // here. We need to create global storage for histograms similar to how we have a central storage - // in shared memory for counters/gauges. In the interim, no de-dup is done here. This may result - // in histograms with duplicate names, but until shared storage is implemented it's ultimately - // less confusing for users who have such configs. - for (ScopeImpl* scope : scopes_) { - for (const auto& name_histogram_pair : scope->central_cache_->histograms_) { - const ParentHistogramSharedPtr& parent_hist = name_histogram_pair.second; - ret.push_back(parent_hist); + Thread::LockGuard lock(hist_mutex_); + { + ret.reserve(histogram_set_.size()); + for (const auto& histogram_ptr : histogram_set_) { + ret.emplace_back(histogram_ptr); } } @@ -189,6 +194,11 @@ void ThreadLocalStoreImpl::initializeThreading(Event::Dispatcher& main_thread_di void ThreadLocalStoreImpl::shutdownThreading() { // This will block both future cache fills as well as cache flushes. shutting_down_ = true; + Thread::LockGuard lock(hist_mutex_); + for (ParentHistogramImpl* histogram : histogram_set_) { + histogram->setShuttingDown(true); + } + histogram_set_.clear(); } void ThreadLocalStoreImpl::mergeHistograms(PostMergeCb merge_complete_cb) { @@ -197,12 +207,9 @@ void ThreadLocalStoreImpl::mergeHistograms(PostMergeCb merge_complete_cb) { merge_in_progress_ = true; tls_->runOnAllThreads( [this]() -> void { - for (const auto& scope : tls_->getTyped().scope_cache_) { - const TlsCacheEntry& tls_cache_entry = scope.second; - for (const auto& name_histogram_pair : tls_cache_entry.histograms_) { - const TlsHistogramSharedPtr& tls_hist = name_histogram_pair.second; - tls_hist->beginMerge(); - } + for (const auto& id_hist : tls_->getTyped().tls_histogram_cache_) { + const TlsHistogramSharedPtr& tls_hist = id_hist.second; + tls_hist->beginMerge(); } }, [this, merge_complete_cb]() -> void { mergeInternal(merge_complete_cb); }); @@ -257,6 +264,10 @@ void ThreadLocalStoreImpl::releaseScopeCrossThread(ScopeImpl* scope) { if (!shutting_down_ && main_thread_dispatcher_) { const uint64_t scope_id = scope->scope_id_; lock.release(); + + // TODO(jmarantz): consider batching all the scope IDs that should be + // cleared from TLS caches to reduce bursts of runOnAllThreads on a large + // config update. See the pattern below used for histograms. main_thread_dispatcher_->post([this, central_cache, scope_id]() { sync_.syncPoint(MainDispatcherCleanupSync); clearScopeFromCaches(scope_id, central_cache); @@ -264,12 +275,27 @@ void ThreadLocalStoreImpl::releaseScopeCrossThread(ScopeImpl* scope) { } } +void ThreadLocalStoreImpl::releaseHistogramCrossThread(uint64_t histogram_id) { + // This can happen from any thread. We post() back to the main thread which will initiate the + // cache flush operation. + if (!shutting_down_ && main_thread_dispatcher_) { + main_thread_dispatcher_->post( + [this, histogram_id]() { clearHistogramFromCaches(histogram_id); }); + } +} + ThreadLocalStoreImpl::TlsCacheEntry& ThreadLocalStoreImpl::TlsCache::insertScope(uint64_t scope_id) { return scope_cache_[scope_id]; } void ThreadLocalStoreImpl::TlsCache::eraseScope(uint64_t scope_id) { scope_cache_.erase(scope_id); } +void ThreadLocalStoreImpl::TlsCache::eraseHistogram(uint64_t histogram_id) { + // This is called for every histogram in every thread, even though the + // histogram may not have been cached in each thread yet. So we don't + // want to check whether the erase() call erased anything. + tls_histogram_cache_.erase(histogram_id); +} void ThreadLocalStoreImpl::clearScopeFromCaches(uint64_t scope_id, CentralCacheEntrySharedPtr central_cache) { @@ -283,6 +309,22 @@ void ThreadLocalStoreImpl::clearScopeFromCaches(uint64_t scope_id, } } +void ThreadLocalStoreImpl::clearHistogramFromCaches(uint64_t histogram_id) { + // If we are shutting down we no longer perform cache flushes as workers may be shutting down + // at the same time. + if (!shutting_down_) { + // Perform a cache flush on all threads. + // + // TODO(jmarantz): If this cross-thread posting proves to be a performance + // bottleneck, + // https://gist.github.com/jmarantz/838cb6de7e74c0970ea6b63eded0139a + // contains a patch that will implement batching together to clear multiple + // histograms. + tls_->runOnAllThreads( + [this, histogram_id]() { tls_->getTyped().eraseHistogram(histogram_id); }); + } +} + ThreadLocalStoreImpl::ScopeImpl::ScopeImpl(ThreadLocalStoreImpl& parent, const std::string& prefix) : scope_id_(parent.next_scope_id_++), parent_(parent), prefix_(Utility::sanitizeStatsName(prefix), parent.alloc_.symbolTable()), @@ -566,9 +608,23 @@ Histogram& ThreadLocalStoreImpl::ScopeImpl::histogramFromStatNameWithTags( [&buckets, this](absl::string_view stat_name) { buckets = &parent_.histogram_settings_->buckets(stat_name); }); - RefcountPtr stat(new ParentHistogramImpl( - final_stat_name, unit, parent_, *this, tag_helper.tagExtractedName(), - tag_helper.statNameTags(), *buckets)); + + RefcountPtr stat; + { + Thread::LockGuard lock(parent_.hist_mutex_); + auto iter = parent_.histogram_set_.find(final_stat_name); + if (iter != parent_.histogram_set_.end()) { + stat = RefcountPtr(*iter); + } else { + stat = new ParentHistogramImpl(final_stat_name, unit, parent_, + tag_helper.tagExtractedName(), tag_helper.statNameTags(), + *buckets, parent_.next_histogram_id_++); + if (!parent_.shutting_down_) { + parent_.histogram_set_.insert(stat.get()); + } + } + } + central_ref = ¢ral_cache_->histograms_[stat->statName()]; *central_ref = stat; } @@ -639,34 +695,34 @@ TextReadoutOptConstRef ThreadLocalStoreImpl::ScopeImpl::findTextReadout(StatName return findStatLockHeld(name, central_cache_->text_readouts_); } -Histogram& ThreadLocalStoreImpl::ScopeImpl::tlsHistogram(StatName name, - ParentHistogramImpl& parent) { +Histogram& ThreadLocalStoreImpl::tlsHistogram(ParentHistogramImpl& parent, uint64_t id) { // tlsHistogram() is generally not called for a histogram that is rejected by // the matcher, so no further rejection-checking is needed at this level. // TlsHistogram inherits its reject/accept status from ParentHistogram. // See comments in counterFromStatName() which explains the logic here. - StatNameHashMap* tls_cache = nullptr; - if (!parent_.shutting_down_ && parent_.tls_) { - tls_cache = &parent_.tls_->getTyped().scope_cache_[this->scope_id_].histograms_; - auto iter = tls_cache->find(name); - if (iter != tls_cache->end()) { - return *iter->second; + TlsHistogramSharedPtr* tls_histogram = nullptr; + if (!shutting_down_ && tls_ != nullptr) { + TlsCache& tls_cache = tls_->getTyped(); + tls_histogram = &tls_cache.tls_histogram_cache_[id]; + if (*tls_histogram != nullptr) { + return **tls_histogram; } } - StatNameTagHelper tag_helper(parent_, name, absl::nullopt); + StatNameTagHelper tag_helper(*this, parent.statName(), absl::nullopt); TlsHistogramSharedPtr hist_tls_ptr( - new ThreadLocalHistogramImpl(name, parent.unit(), tag_helper.tagExtractedName(), + new ThreadLocalHistogramImpl(parent.statName(), parent.unit(), tag_helper.tagExtractedName(), tag_helper.statNameTags(), symbolTable())); parent.addTlsHistogram(hist_tls_ptr); - if (tls_cache) { - tls_cache->insert(std::make_pair(hist_tls_ptr->statName(), hist_tls_ptr)); + if (tls_histogram != nullptr) { + *tls_histogram = hist_tls_ptr; } + return *hist_tls_ptr; } @@ -682,7 +738,7 @@ ThreadLocalHistogramImpl::ThreadLocalHistogramImpl(StatName name, Histogram::Uni } ThreadLocalHistogramImpl::~ThreadLocalHistogramImpl() { - MetricImpl::clear(symbolTable()); + MetricImpl::clear(symbol_table_); hist_free(histograms_[0]); hist_free(histograms_[1]); } @@ -699,28 +755,78 @@ void ThreadLocalHistogramImpl::merge(histogram_t* target) { hist_clear(*other_histogram); } -ParentHistogramImpl::ParentHistogramImpl(StatName name, Histogram::Unit unit, Store& parent, - TlsScope& tls_scope, StatName tag_extracted_name, +ParentHistogramImpl::ParentHistogramImpl(StatName name, Histogram::Unit unit, + ThreadLocalStoreImpl& thread_local_store, + StatName tag_extracted_name, const StatNameTagVector& stat_name_tags, - ConstSupportedBuckets& supported_buckets) - : MetricImpl(name, tag_extracted_name, stat_name_tags, parent.symbolTable()), unit_(unit), - parent_(parent), tls_scope_(tls_scope), interval_histogram_(hist_alloc()), + ConstSupportedBuckets& supported_buckets, uint64_t id) + : MetricImpl(name, tag_extracted_name, stat_name_tags, thread_local_store.symbolTable()), + unit_(unit), thread_local_store_(thread_local_store), interval_histogram_(hist_alloc()), cumulative_histogram_(hist_alloc()), interval_statistics_(interval_histogram_, supported_buckets), - cumulative_statistics_(cumulative_histogram_, supported_buckets), merged_(false) {} + cumulative_statistics_(cumulative_histogram_, supported_buckets), merged_(false), id_(id) {} ParentHistogramImpl::~ParentHistogramImpl() { - MetricImpl::clear(parent_.symbolTable()); + thread_local_store_.releaseHistogramCrossThread(id_); + ASSERT(ref_count_ == 0); + MetricImpl::clear(thread_local_store_.symbolTable()); hist_free(interval_histogram_); hist_free(cumulative_histogram_); } +void ParentHistogramImpl::incRefCount() { ++ref_count_; } + +bool ParentHistogramImpl::decRefCount() { + bool ret; + if (shutting_down_) { + // When shutting down, we cannot reference thread_local_store_, as + // histograms can outlive the store. So we decrement the ref-count without + // the stores' lock. We will not be removing the object from the store's + // histogram map in this scenario, as the set was cleared during shutdown, + // and will not be repopulated in histogramFromStatNameWithTags after + // initiating shutdown. + ret = --ref_count_ == 0; + } else { + // We delegate to the Store object to decrement the ref-count so it can hold + // the lock to the map. If we don't hold a lock, another thread may + // simultaneously try to allocate the same name'd histogram after we + // decrement it, and we'll wind up with a dtor/update race. To avoid this we + // must hold the lock until the histogram is removed from the map. + // + // See also StatsSharedImpl::decRefCount() in allocator_impl.cc, which has + // the same issue. + ret = thread_local_store_.decHistogramRefCount(*this, ref_count_); + } + return ret; +} + +bool ThreadLocalStoreImpl::decHistogramRefCount(ParentHistogramImpl& hist, + std::atomic& ref_count) { + // We must hold the store's histogram lock when decrementing the + // refcount. Otherwise another thread may simultaneously try to allocate the + // same name'd stat after we decrement it, and we'll wind up with a + // dtor/update race. To avoid this we must hold the lock until the stat is + // removed from the map. + Thread::LockGuard lock(hist_mutex_); + ASSERT(ref_count >= 1); + if (--ref_count == 0) { + if (!shutting_down_) { + const size_t count = histogram_set_.erase(hist.statName()); + ASSERT(shutting_down_ || count == 1); + } + return true; + } + return false; +} + +SymbolTable& ParentHistogramImpl::symbolTable() { return thread_local_store_.symbolTable(); } + Histogram::Unit ParentHistogramImpl::unit() const { return unit_; } void ParentHistogramImpl::recordValue(uint64_t value) { - Histogram& tls_histogram = tls_scope_.tlsHistogram(statName(), *this); + Histogram& tls_histogram = thread_local_store_.tlsHistogram(*this, id_); tls_histogram.recordValue(value); - parent_.deliverHistogramToSinks(*this, value); + thread_local_store_.deliverHistogramToSinks(*this, value); } bool ParentHistogramImpl::used() const { diff --git a/source/common/stats/thread_local_store.h b/source/common/stats/thread_local_store.h index 23ce40e5fc15b..6c3a3c0e8b9d2 100644 --- a/source/common/stats/thread_local_store.h +++ b/source/common/stats/thread_local_store.h @@ -74,16 +74,16 @@ class ThreadLocalHistogramImpl : public HistogramImplHelper { using TlsHistogramSharedPtr = RefcountPtr; -class TlsScope; +class ThreadLocalStoreImpl; /** * Log Linear Histogram implementation that is stored in the main thread. */ class ParentHistogramImpl : public MetricImpl { public: - ParentHistogramImpl(StatName name, Histogram::Unit unit, Store& parent, TlsScope& tls_scope, + ParentHistogramImpl(StatName name, Histogram::Unit unit, ThreadLocalStoreImpl& parent, StatName tag_extracted_name, const StatNameTagVector& stat_name_tags, - ConstSupportedBuckets& supported_buckets); + ConstSupportedBuckets& supported_buckets, uint64_t id); ~ParentHistogramImpl() override; void addTlsHistogram(const TlsHistogramSharedPtr& hist_ptr); @@ -108,20 +108,23 @@ class ParentHistogramImpl : public MetricImpl { const std::string bucketSummary() const override; // Stats::Metric - SymbolTable& symbolTable() override { return parent_.symbolTable(); } + SymbolTable& symbolTable() override; bool used() const override; // RefcountInterface - void incRefCount() override { refcount_helper_.incRefCount(); } - bool decRefCount() override { return refcount_helper_.decRefCount(); } - uint32_t use_count() const override { return refcount_helper_.use_count(); } + void incRefCount() override; + bool decRefCount() override; + uint32_t use_count() const override { return ref_count_; } + + // Indicates that the ThreadLocalStore is shutting down, so no need to clear its histogram_set_. + void setShuttingDown(bool shutting_down) { shutting_down_ = shutting_down; } + bool shuttingDown() const { return shutting_down_; } private: bool usedLockHeld() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(merge_lock_); Histogram::Unit unit_; - Store& parent_; - TlsScope& tls_scope_; + ThreadLocalStoreImpl& thread_local_store_; histogram_t* interval_histogram_; histogram_t* cumulative_histogram_; HistogramStatisticsImpl interval_statistics_; @@ -129,27 +132,13 @@ class ParentHistogramImpl : public MetricImpl { mutable Thread::MutexBasicLockable merge_lock_; std::list tls_histograms_ ABSL_GUARDED_BY(merge_lock_); bool merged_; - RefcountHelper refcount_helper_; + std::atomic shutting_down_{false}; + std::atomic ref_count_{0}; + const uint64_t id_; // Index into TlsCache::histogram_cache_. }; using ParentHistogramImplSharedPtr = RefcountPtr; -/** - * Class used to create ThreadLocalHistogram in the scope. - */ -class TlsScope : public Scope { -public: - ~TlsScope() override = default; - - // TODO(ramaraochavali): Allow direct TLS access for the advanced consumers. - /** - * @return a ThreadLocalHistogram within the scope's namespace. - * @param name name of the histogram with scope prefix attached. - * @param parent the parent histogram. - */ - virtual Histogram& tlsHistogram(StatName name, ParentHistogramImpl& parent) PURE; -}; - /** * Store implementation with thread local caching. For design details see * https://github.com/envoyproxy/envoy/blob/master/source/docs/stats.md @@ -266,6 +255,8 @@ class ThreadLocalStoreImpl : Logger::Loggable, public StoreRo void shutdownThreading() override; void mergeHistograms(PostMergeCb merge_cb) override; + Histogram& tlsHistogram(ParentHistogramImpl& parent, uint64_t id); + /** * @return a thread synchronizer object used for controlling thread behavior in tests. */ @@ -276,7 +267,12 @@ class ThreadLocalStoreImpl : Logger::Loggable, public StoreRo */ const StatNameSet& wellKnownTags() const { return *well_known_tags_; } + bool decHistogramRefCount(ParentHistogramImpl& histogram, std::atomic& ref_count); + void releaseHistogramCrossThread(uint64_t histogram_id); + private: + friend class ThreadLocalStoreTestingPeer; + template using StatRefMap = StatNameHashMap>; struct TlsCacheEntry { @@ -288,9 +284,18 @@ class ThreadLocalStoreImpl : Logger::Loggable, public StoreRo StatRefMap gauges_; StatRefMap text_readouts_; - // The histogram objects are not shared with the central cache, and don't - // require taking a lock when decrementing their ref-count. - StatNameHashMap histograms_; + // Histograms also require holding a mutex while decrementing reference + // counts. The only difference from other stats is that the histogram_set_ + // lives in the ThreadLocalStore object, rather than in + // AllocatorImpl. Histograms are removed from that set when all scopes + // referencing the histogram are dropped. Each ParentHistogram has a unique + // index, which is not re-used during the process lifetime. + // + // There is also a tls_histogram_cache_ in the TlsCache object, which is + // not tied to a scope. It maps from parent histogram's unique index to + // a TlsHistogram. This enables continuity between same-named histograms + // in same-named scopes. That scenario is common when re-creating scopes in + // response to xDS. StatNameHashMap parent_histograms_; // We keep a TLS cache of rejected stat names. This costs memory, but @@ -315,7 +320,7 @@ class ThreadLocalStoreImpl : Logger::Loggable, public StoreRo }; using CentralCacheEntrySharedPtr = RefcountPtr; - struct ScopeImpl : public TlsScope { + struct ScopeImpl : public Scope { ScopeImpl(ThreadLocalStoreImpl& parent, const std::string& prefix); ~ScopeImpl() override; @@ -328,7 +333,6 @@ class ThreadLocalStoreImpl : Logger::Loggable, public StoreRo Histogram& histogramFromStatNameWithTags(const StatName& name, StatNameTagVectorOptConstRef tags, Histogram::Unit unit) override; - Histogram& tlsHistogram(StatName name, ParentHistogramImpl& parent) override; TextReadout& textReadoutFromStatNameWithTags(const StatName& name, StatNameTagVectorOptConstRef tags) override; ScopePtr createScope(const std::string& name) override { @@ -436,6 +440,7 @@ class ThreadLocalStoreImpl : Logger::Loggable, public StoreRo struct TlsCache : public ThreadLocal::ThreadLocalObject { TlsCacheEntry& insertScope(uint64_t scope_id); void eraseScope(uint64_t scope_id); + void eraseHistogram(uint64_t histogram); // The TLS scope cache is keyed by scope ID. This is used to avoid complex circular references // during scope destruction. An ID is required vs. using the address of the scope pointer @@ -445,6 +450,9 @@ class ThreadLocalStoreImpl : Logger::Loggable, public StoreRo // store. See the overview for more information. This complexity is required for lockless // operation in the fast path. absl::flat_hash_map scope_cache_; + + // Maps from histogram ID (monotonically increasing) to a TLS histogram. + absl::flat_hash_map tls_histogram_cache_; }; template bool iterHelper(StatFn fn) const { @@ -459,6 +467,7 @@ class ThreadLocalStoreImpl : Logger::Loggable, public StoreRo std::string getTagsForName(const std::string& name, TagVector& tags) const; void clearScopeFromCaches(uint64_t scope_id, CentralCacheEntrySharedPtr central_cache); + void clearHistogramFromCaches(uint64_t histogram_id); void releaseScopeCrossThread(ScopeImpl* scope); void mergeInternal(PostMergeCb merge_cb); bool rejects(StatName name) const; @@ -496,15 +505,19 @@ class ThreadLocalStoreImpl : Logger::Loggable, public StoreRo // It seems like it would be better to have each client that expects a stat // to exist to hold it as (e.g.) a CounterSharedPtr rather than a Counter& // but that would be fairly complex to change. - std::vector deleted_counters_; - std::vector deleted_gauges_; - std::vector deleted_histograms_; - std::vector deleted_text_readouts_; + std::vector deleted_counters_ ABSL_GUARDED_BY(lock_); + std::vector deleted_gauges_ ABSL_GUARDED_BY(lock_); + std::vector deleted_histograms_ ABSL_GUARDED_BY(lock_); + std::vector deleted_text_readouts_ ABSL_GUARDED_BY(lock_); Thread::ThreadSynchronizer sync_; std::atomic next_scope_id_{}; + uint64_t next_histogram_id_ ABSL_GUARDED_BY(hist_mutex_) = 0; StatNameSetPtr well_known_tags_; + + mutable Thread::MutexBasicLockable hist_mutex_; + StatSet histogram_set_ ABSL_GUARDED_BY(hist_mutex_); }; using ThreadLocalStoreImplPtr = std::unique_ptr; diff --git a/source/docs/stats.md b/source/docs/stats.md index 43be6992146c3..f80d1b46932f4 100644 --- a/source/docs/stats.md +++ b/source/docs/stats.md @@ -16,7 +16,7 @@ values, they are passed from parent to child in an RPC protocol. They were previously held in shared memory, which imposed various restrictions. Unlike the shared memory implementation, the RPC passing *requires a mode-bit specified when constructing gauges indicating whether it should be accumulated across hot-restarts*. - + ## Performance and Thread Local Storage A key tenant of the Envoy architecture is high performance on machines with @@ -77,6 +77,18 @@ followed. accumulates in to *interval* histograms. * Finally the main *interval* histogram is merged to *cumulative* histogram. +`ParentHistogram`s are held weakly a set in ThreadLocalStore. Like other stats, +they keep an embedded reference count and are removed from the set and destroyed +when the last strong reference disappears. Consequently, we must hold a lock for +the set when decrementing histogram reference counts. A similar process occurs for +other types of stats, but in those cases it is taken care of in `AllocatorImpl`. +There are strong references to `ParentHistograms` in TlsCacheEntry::parent_histograms_. + +Thread-local `TlsHistogram`s are created on behalf of a `ParentHistogram` +whenever accessed from a worker thread. They are strongly referenced in the +`ParentHistogram` as well as in a cache in the `ThreadLocalStore`, to help +maintain data continuity as scopes are re-created during operation. + ## Stat naming infrastructure and memory consumption Stat names are replicated in several places in various forms. diff --git a/source/server/admin/stats_handler.cc b/source/server/admin/stats_handler.cc index 753774f59dc97..e64fd878a8fb4 100644 --- a/source/server/admin/stats_handler.cc +++ b/source/server/admin/stats_handler.cc @@ -121,13 +121,11 @@ Http::Code StatsHandler::handlerStats(absl::string_view url, for (const auto& stat : all_stats) { response.add(fmt::format("{}: {}\n", stat.first, stat.second)); } - // TODO(ramaraochavali): See the comment in ThreadLocalStoreImpl::histograms() for why we use a - // multimap here. This makes sure that duplicate histograms get output. When shared storage is - // implemented this can be switched back to a normal map. - std::multimap all_histograms; + std::map all_histograms; for (const Stats::ParentHistogramSharedPtr& histogram : server_.stats().histograms()) { if (shouldShowMetric(*histogram, used_only, regex)) { - all_histograms.emplace(histogram->name(), histogram->quantileSummary()); + auto insert = all_histograms.emplace(histogram->name(), histogram->quantileSummary()); + ASSERT(insert.second); // No duplicates expected. } } for (const auto& histogram : all_histograms) { diff --git a/test/common/stats/thread_local_store_test.cc b/test/common/stats/thread_local_store_test.cc index 726f32174ae84..135c6b424097e 100644 --- a/test/common/stats/thread_local_store_test.cc +++ b/test/common/stats/thread_local_store_test.cc @@ -29,6 +29,7 @@ #include "gtest/gtest.h" using testing::_; +using testing::HasSubstr; using testing::InSequence; using testing::NiceMock; using testing::Ref; @@ -39,6 +40,30 @@ namespace Stats { const uint64_t MaxStatNameLength = 127; +class ThreadLocalStoreTestingPeer { +public: + // Calculates the number of TLS histograms across all threads. This requires + // dispatching to all threads and blocking on their completion, and is exposed + // as a testing peer to enable tests that ensure that TLS histograms don't + // leak. + // + // Note that this must be called from the "main thread", which has different + // implications for unit tests that use real threads vs mocks. The easiest way + // to capture this in a general purpose helper is to use a callback to convey + // the resultant sum. + static void numTlsHistograms(ThreadLocalStoreImpl& thread_local_store_impl, + const std::function& num_tls_hist_cb) { + auto num_tls_histograms = std::make_shared>(0); + thread_local_store_impl.tls_->runOnAllThreads( + [&thread_local_store_impl, num_tls_histograms]() { + auto& tls_cache = + thread_local_store_impl.tls_->getTyped(); + *num_tls_histograms += tls_cache.tls_histogram_cache_.size(); + }, + [num_tls_hist_cb, num_tls_histograms]() { num_tls_hist_cb(*num_tls_histograms); }); + } +}; + class StatsThreadLocalStoreTest : public testing::Test { public: StatsThreadLocalStoreTest() @@ -52,6 +77,21 @@ class StatsThreadLocalStoreTest : public testing::Test { store_->addSink(sink_); } + uint32_t numTlsHistograms() { + uint32_t num_tls_histograms; + absl::Mutex mutex; + bool done = false; + ThreadLocalStoreTestingPeer::numTlsHistograms( + *store_, [&mutex, &done, &num_tls_histograms](uint32_t num) { + absl::MutexLock lock(&mutex); + num_tls_histograms = num; + done = true; + }); + absl::MutexLock lock(&mutex); + mutex.Await(absl::Condition(&done)); + return num_tls_histograms; + } + SymbolTablePtr symbol_table_; NiceMock main_thread_dispatcher_; NiceMock tls_; @@ -381,6 +421,52 @@ TEST_F(StatsThreadLocalStoreTest, BasicScope) { store_->shutdownThreading(); scope1->deliverHistogramToSinks(h1, 100); scope1->deliverHistogramToSinks(h2, 200); + scope1.reset(); + tls_.shutdownThread(); +} + +TEST_F(StatsThreadLocalStoreTest, HistogramScopeOverlap) { + InSequence s; + store_->initializeThreading(main_thread_dispatcher_, tls_); + + // Creating two scopes with the same name gets you two distinct scope objects. + ScopePtr scope1 = store_->createScope("scope."); + ScopePtr scope2 = store_->createScope("scope."); + EXPECT_NE(scope1, scope2); + + EXPECT_EQ(0, store_->histograms().size()); + EXPECT_EQ(0, numTlsHistograms()); + + // However, stats created in the two same-named scopes will be the same objects. + Counter& counter = scope1->counterFromString("counter"); + EXPECT_EQ(&counter, &scope2->counterFromString("counter")); + Gauge& gauge = scope1->gaugeFromString("gauge", Gauge::ImportMode::Accumulate); + EXPECT_EQ(&gauge, &scope2->gaugeFromString("gauge", Gauge::ImportMode::Accumulate)); + TextReadout& text_readout = scope1->textReadoutFromString("tr"); + EXPECT_EQ(&text_readout, &scope2->textReadoutFromString("tr")); + Histogram& histogram = scope1->histogramFromString("histogram", Histogram::Unit::Unspecified); + EXPECT_EQ(&histogram, &scope2->histogramFromString("histogram", Histogram::Unit::Unspecified)); + + // The histogram was created in scope1, which can now be destroyed. But the + // histogram is kept alive by scope2. + EXPECT_CALL(sink_, onHistogramComplete(Ref(histogram), 100)); + histogram.recordValue(100); + EXPECT_EQ(1, store_->histograms().size()); + EXPECT_EQ(1, numTlsHistograms()); + scope1.reset(); + EXPECT_EQ(1, store_->histograms().size()); + EXPECT_EQ(1, numTlsHistograms()); + EXPECT_CALL(sink_, onHistogramComplete(Ref(histogram), 200)); + histogram.recordValue(200); + EXPECT_EQ(&histogram, &scope2->histogramFromString("histogram", Histogram::Unit::Unspecified)); + scope2.reset(); + EXPECT_EQ(0, store_->histograms().size()); + EXPECT_EQ(0, numTlsHistograms()); + + store_->shutdownThreading(); + + store_->histogramFromString("histogram_after_shutdown", Histogram::Unit::Unspecified); + tls_.shutdownThread(); } @@ -1102,7 +1188,7 @@ TEST_F(StatsThreadLocalStoreTestNoFixture, MemoryWithTlsFakeSymbolTable) { TestUtil::MemoryTest memory_test; TestUtil::forEachSampleStat( 100, [this](absl::string_view name) { store_->counterFromString(std::string(name)); }); - EXPECT_MEMORY_EQ(memory_test.consumedBytes(), 1498160); // Apr 8, 2020 + EXPECT_MEMORY_EQ(memory_test.consumedBytes(), 1498128); // July 30, 2020 EXPECT_MEMORY_LE(memory_test.consumedBytes(), 1.6 * million_); } @@ -1122,7 +1208,7 @@ TEST_F(StatsThreadLocalStoreTestNoFixture, MemoryWithTlsRealSymbolTable) { TestUtil::MemoryTest memory_test; TestUtil::forEachSampleStat( 100, [this](absl::string_view name) { store_->counterFromString(std::string(name)); }); - EXPECT_MEMORY_EQ(memory_test.consumedBytes(), 827664); // July 2, 2020 + EXPECT_MEMORY_EQ(memory_test.consumedBytes(), 827632); // July 20, 2020 EXPECT_MEMORY_LE(memory_test.consumedBytes(), 0.9 * million_); } @@ -1378,9 +1464,8 @@ TEST_F(HistogramTest, ParentHistogramBucketSummary) { parent_histogram->bucketSummary()); } -class ClusterShutdownCleanupStarvationTest : public ThreadLocalStoreNoMocksTestBase { -public: - static constexpr uint32_t NumThreads = 2; +class ThreadLocalRealThreadsTestBase : public ThreadLocalStoreNoMocksTestBase { +protected: static constexpr uint32_t NumScopes = 1000; static constexpr uint32_t NumIters = 35; @@ -1416,18 +1501,17 @@ class ClusterShutdownCleanupStarvationTest : public ThreadLocalStoreNoMocksTestB absl::BlockingCounter blocking_counter_; }; - ClusterShutdownCleanupStarvationTest() - : start_time_(time_system_.monotonicTime()), api_(Api::createApiForTest()), - thread_factory_(api_->threadFactory()), pool_(store_->symbolTable()), - my_counter_name_(pool_.add("my_counter")), - my_counter_scoped_name_(pool_.add("scope.my_counter")) { + ThreadLocalRealThreadsTestBase(uint32_t num_threads) + : num_threads_(num_threads), start_time_(time_system_.monotonicTime()), + api_(Api::createApiForTest()), thread_factory_(api_->threadFactory()), + pool_(store_->symbolTable()) { // This is the same order as InstanceImpl::initialize in source/server/server.cc. - thread_dispatchers_.resize(NumThreads); + thread_dispatchers_.resize(num_threads_); { - BlockingBarrier blocking_barrier(NumThreads + 1); + BlockingBarrier blocking_barrier(num_threads_ + 1); main_thread_ = thread_factory_.createThread( [this, &blocking_barrier]() { mainThreadFn(blocking_barrier); }); - for (uint32_t i = 0; i < NumThreads; ++i) { + for (uint32_t i = 0; i < num_threads_; ++i) { threads_.emplace_back(thread_factory_.createThread( [this, i, &blocking_barrier]() { workerThreadFn(i, blocking_barrier); })); } @@ -1447,7 +1531,7 @@ class ClusterShutdownCleanupStarvationTest : public ThreadLocalStoreNoMocksTestB } } - ~ClusterShutdownCleanupStarvationTest() override { + ~ThreadLocalRealThreadsTestBase() override { { BlockingBarrier blocking_barrier(1); main_dispatcher_->post(blocking_barrier.run([this]() { @@ -1473,14 +1557,6 @@ class ClusterShutdownCleanupStarvationTest : public ThreadLocalStoreNoMocksTestB main_thread_->join(); } - void createScopesIncCountersAndCleanup() { - for (uint32_t i = 0; i < NumScopes; ++i) { - ScopePtr scope = store_->createScope("scope."); - Counter& counter = scope->counterFromStatName(my_counter_name_); - counter.inc(); - } - } - void workerThreadFn(uint32_t thread_index, BlockingBarrier& blocking_barrier) { thread_dispatchers_[thread_index] = api_->allocateDispatcher(absl::StrCat("test_worker_", thread_index)); @@ -1494,19 +1570,21 @@ class ClusterShutdownCleanupStarvationTest : public ThreadLocalStoreNoMocksTestB main_dispatcher_->run(Event::Dispatcher::RunType::RunUntilExit); } - void createScopesIncCountersAndCleanupAllThreads() { - BlockingBarrier blocking_barrier(NumThreads); - for (Event::DispatcherPtr& thread_dispatcher : thread_dispatchers_) { - thread_dispatcher->post( - blocking_barrier.run([this]() { createScopesIncCountersAndCleanup(); })); - } + void mainDispatchBlock() { + // To ensure all stats are freed we have to wait for a few posts() to clear. + // First, wait for the main-dispatcher to initiate the cross-thread TLS cleanup. + BlockingBarrier blocking_barrier(1); + main_dispatcher_->post(blocking_barrier.run([]() {})); } - std::chrono::seconds elapsedTime() { - return std::chrono::duration_cast(time_system_.monotonicTime() - - start_time_); + void tlsBlock() { + BlockingBarrier blocking_barrier(num_threads_); + for (Event::DispatcherPtr& thread_dispatcher : thread_dispatchers_) { + thread_dispatcher->post(blocking_barrier.run([]() {})); + } } + const uint32_t num_threads_; Event::TestRealTimeSystem time_system_; MonotonicTime start_time_; Api::ApiPtr api_; @@ -1517,6 +1595,37 @@ class ClusterShutdownCleanupStarvationTest : public ThreadLocalStoreNoMocksTestB Thread::ThreadPtr main_thread_; std::vector threads_; StatNamePool pool_; +}; + +class ClusterShutdownCleanupStarvationTest : public ThreadLocalRealThreadsTestBase { +protected: + static constexpr uint32_t NumThreads = 2; + + ClusterShutdownCleanupStarvationTest() + : ThreadLocalRealThreadsTestBase(NumThreads), my_counter_name_(pool_.add("my_counter")), + my_counter_scoped_name_(pool_.add("scope.my_counter")) {} + + void createScopesIncCountersAndCleanup() { + for (uint32_t i = 0; i < NumScopes; ++i) { + ScopePtr scope = store_->createScope("scope."); + Counter& counter = scope->counterFromStatName(my_counter_name_); + counter.inc(); + } + } + + void createScopesIncCountersAndCleanupAllThreads() { + BlockingBarrier blocking_barrier(NumThreads); + for (Event::DispatcherPtr& thread_dispatcher : thread_dispatchers_) { + thread_dispatcher->post( + blocking_barrier.run([this]() { createScopesIncCountersAndCleanup(); })); + } + } + + std::chrono::seconds elapsedTime() { + return std::chrono::duration_cast(time_system_.monotonicTime() - + start_time_); + } + StatName my_counter_name_; StatName my_counter_scoped_name_; }; @@ -1529,24 +1638,14 @@ TEST_F(ClusterShutdownCleanupStarvationTest, TwelveThreadsWithBlockade) { for (uint32_t i = 0; i < NumIters && elapsedTime() < std::chrono::seconds(5); ++i) { createScopesIncCountersAndCleanupAllThreads(); - // To ensure all stats are freed we have to wait for a few posts() to clear. // First, wait for the main-dispatcher to initiate the cross-thread TLS cleanup. - auto main_dispatch_block = [this]() { - BlockingBarrier blocking_barrier(1); - main_dispatcher_->post(blocking_barrier.run([]() {})); - }; - main_dispatch_block(); + mainDispatchBlock(); // Next, wait for all the worker threads to complete their TLS cleanup. - { - BlockingBarrier blocking_barrier(NumThreads); - for (Event::DispatcherPtr& thread_dispatcher : thread_dispatchers_) { - thread_dispatcher->post(blocking_barrier.run([]() {})); - } - } + tlsBlock(); // Finally, wait for the final central-cache cleanup, which occurs on the main thread. - main_dispatch_block(); + mainDispatchBlock(); // Here we show that the counter cleanups have finished, because the use-count is 1. CounterSharedPtr counter = @@ -1583,5 +1682,124 @@ TEST_F(ClusterShutdownCleanupStarvationTest, TwelveThreadsWithoutBlockade) { store_->sync().signal(ThreadLocalStoreImpl::MainDispatcherCleanupSync); } +class HistogramThreadTest : public ThreadLocalRealThreadsTestBase { +protected: + static constexpr uint32_t NumThreads = 10; + + HistogramThreadTest() : ThreadLocalRealThreadsTestBase(NumThreads) {} + + void mergeHistograms() { + BlockingBarrier blocking_barrier(1); + main_dispatcher_->post([this, &blocking_barrier]() { + store_->mergeHistograms(blocking_barrier.decrementCountFn()); + }); + } + + uint32_t numTlsHistograms() { + uint32_t num; + { + BlockingBarrier blocking_barrier(1); + main_dispatcher_->post([this, &num, &blocking_barrier]() { + ThreadLocalStoreTestingPeer::numTlsHistograms(*store_, + [&num, &blocking_barrier](uint32_t num_hist) { + num = num_hist; + blocking_barrier.decrementCount(); + }); + }); + } + return num; + } + + // Executes a function on every worker thread dispatcher. + void foreachThread(const std::function& fn) { + BlockingBarrier blocking_barrier(NumThreads); + for (Event::DispatcherPtr& thread_dispatcher : thread_dispatchers_) { + thread_dispatcher->post(blocking_barrier.run(fn)); + } + } +}; + +TEST_F(HistogramThreadTest, MakeHistogramsAndRecordValues) { + foreachThread([this]() { + Histogram& histogram = + store_->histogramFromString("my_hist", Stats::Histogram::Unit::Unspecified); + histogram.recordValue(42); + }); + + mergeHistograms(); + + auto histograms = store_->histograms(); + ASSERT_EQ(1, histograms.size()); + ParentHistogramSharedPtr hist = histograms[0]; + EXPECT_THAT(hist->bucketSummary(), + HasSubstr(absl::StrCat(" B25(0,0) B50(", NumThreads, ",", NumThreads, ") "))); +} + +TEST_F(HistogramThreadTest, ScopeOverlap) { + // Creating two scopes with the same name gets you two distinct scope objects. + ScopePtr scope1 = store_->createScope("scope."); + ScopePtr scope2 = store_->createScope("scope."); + EXPECT_NE(scope1, scope2); + + EXPECT_EQ(0, store_->histograms().size()); + EXPECT_EQ(0, numTlsHistograms()); + + // Histograms created in the two same-named scopes will be the same objects. + foreachThread([&scope1, &scope2]() { + Histogram& histogram = scope1->histogramFromString("histogram", Histogram::Unit::Unspecified); + EXPECT_EQ(&histogram, &scope2->histogramFromString("histogram", Histogram::Unit::Unspecified)); + histogram.recordValue(100); + }); + + mergeHistograms(); + + // Verify that we have the expected number of TLS histograms since we accessed + // the histogram on every thread. + std::vector histograms = store_->histograms(); + ASSERT_EQ(1, histograms.size()); + EXPECT_EQ(NumThreads, numTlsHistograms()); + + // There's no convenient API to pull data out of the histogram, except as + // a string. This expectation captures the bucket transition to indicate + // 0 samples at less than 100, and 10 between 100 and 249 inclusive. + EXPECT_THAT(histograms[0]->bucketSummary(), + HasSubstr(absl::StrCat(" B100(0,0) B250(", NumThreads, ",", NumThreads, ") "))); + + // The histogram was created in scope1, which can now be destroyed. But the + // histogram is kept alive by scope2. + scope1.reset(); + histograms = store_->histograms(); + EXPECT_EQ(1, histograms.size()); + EXPECT_EQ(NumThreads, numTlsHistograms()); + + // We can continue to accumulate samples at the scope2's view of the same + // histogram, and they will combine with the existing data, despite the + // fact that scope1 has been deleted. + foreachThread([&scope2]() { + Histogram& histogram = scope2->histogramFromString("histogram", Histogram::Unit::Unspecified); + histogram.recordValue(300); + }); + + mergeHistograms(); + + // Shows the bucket summary with 10 samples at >=100, and 20 at >=250. + EXPECT_THAT(histograms[0]->bucketSummary(), + HasSubstr(absl::StrCat(" B100(0,0) B250(0,", NumThreads, ") B500(", NumThreads, ",", + 2 * NumThreads, ") "))); + + // Now clear everything, and synchronize the system by calling mergeHistograms(). + // THere should be no more ParentHistograms or TlsHistograms. + scope2.reset(); + histograms.clear(); + mergeHistograms(); + + EXPECT_EQ(0, store_->histograms().size()); + EXPECT_EQ(0, numTlsHistograms()); + + store_->shutdownThreading(); + + store_->histogramFromString("histogram_after_shutdown", Histogram::Unit::Unspecified); +} + } // namespace Stats } // namespace Envoy diff --git a/test/integration/stats_integration_test.cc b/test/integration/stats_integration_test.cc index 26143f370000d..f66a9b8a14bf2 100644 --- a/test/integration/stats_integration_test.cc +++ b/test/integration/stats_integration_test.cc @@ -287,6 +287,7 @@ TEST_P(ClusterMemoryTestRunner, MemoryLargeClusterSizeWithFakeSymbolTable) { // and body sizes. // 2020/07/21 12034 44811 46000 Add configurable histogram buckets. // 2020/07/31 12035 45002 46000 Init manager store unready targets in hash map. + // 2020/08/10 12275 44949 46000 Re-organize tls histogram maps to improve continuity. // Note: when adjusting this value: EXPECT_MEMORY_EQ is active only in CI // 'release' builds, where we control the platform and tool-chain. So you @@ -304,7 +305,8 @@ TEST_P(ClusterMemoryTestRunner, MemoryLargeClusterSizeWithFakeSymbolTable) { // We only run the exact test for ipv6 because ipv4 in some cases may allocate a // different number of bytes. We still run the approximate test. if (ip_version_ != Network::Address::IpVersion::v6) { - EXPECT_MEMORY_EQ(m_per_cluster, 45002); + // https://github.com/envoyproxy/envoy/issues/12209 + // EXPECT_MEMORY_EQ(m_per_cluster, 44949); } EXPECT_MEMORY_LE(m_per_cluster, 46000); // Round up to allow platform variations. } @@ -363,6 +365,7 @@ TEST_P(ClusterMemoryTestRunner, MemoryLargeClusterSizeWithRealSymbolTable) { // and body sizes. // 2020/07/21 12034 36923 38000 Add configurable histogram buckets. // 2020/07/31 12035 37114 38000 Init manager store unready targets in hash map. + // 2020/08/10 12275 37061 38000 Re-organize tls histogram maps to improve continuity. // Note: when adjusting this value: EXPECT_MEMORY_EQ is active only in CI // 'release' builds, where we control the platform and tool-chain. So you @@ -380,7 +383,8 @@ TEST_P(ClusterMemoryTestRunner, MemoryLargeClusterSizeWithRealSymbolTable) { // We only run the exact test for ipv6 because ipv4 in some cases may allocate a // different number of bytes. We still run the approximate test. if (ip_version_ != Network::Address::IpVersion::v6) { - EXPECT_MEMORY_EQ(m_per_cluster, 37114); + // https://github.com/envoyproxy/envoy/issues/12209 + // EXPECT_MEMORY_EQ(m_per_cluster, 37061); } EXPECT_MEMORY_LE(m_per_cluster, 38000); // Round up to allow platform variations. }