Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions include/envoy/thread_local/thread_local.h
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,13 @@ class Instance : public SlotAllocator {
* @return Event::Dispatcher& the thread local dispatcher.
*/
virtual Event::Dispatcher& dispatcher() PURE;

/**
* Returns whether or not global threading has been shutdown.
*
* @return true if global threading has been shutdown or false if not.
*/
virtual bool isShutdown() const PURE;
};

} // namespace ThreadLocal
Expand Down
113 changes: 85 additions & 28 deletions source/common/stats/thread_local_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ ThreadLocalStoreImpl::~ThreadLocalStoreImpl() {
ASSERT(shutting_down_ || !threading_ever_initialized_);
default_scope_.reset();
ASSERT(scopes_.empty());
ASSERT(scopes_to_cleanup_.empty());
ASSERT(central_cache_entries_to_cleanup_.empty());
ASSERT(histograms_to_cleanup_.empty());
}

void ThreadLocalStoreImpl::setHistogramSettings(HistogramSettingsConstPtr&& histogram_settings) {
Expand Down Expand Up @@ -194,11 +197,23 @@ void ThreadLocalStoreImpl::initializeThreading(Event::Dispatcher& main_thread_di
tls_cache_ = ThreadLocal::TypedSlot<TlsCache>::makeUnique(tls);
tls_cache_->set(
[](Event::Dispatcher&) -> std::shared_ptr<TlsCache> { return std::make_shared<TlsCache>(); });
tls_ = tls;
}

void ThreadLocalStoreImpl::shutdownThreading() {
// This will block both future cache fills as well as cache flushes.
shutting_down_ = true;
ASSERT(!tls_.has_value() || tls_->isShutdown());

// We can't call runOnAllThreads here as global threading has already been shutdown. It is okay
// to simply clear the scopes and central cache entries here as they will be cleaned up during
// thread local data cleanup in InstanceImpl::shutdownThread().
{
Thread::LockGuard lock(lock_);
scopes_to_cleanup_.clear();
central_cache_entries_to_cleanup_.clear();
}

Thread::LockGuard lock(hist_mutex_);
for (ParentHistogramImpl* histogram : histogram_set_) {
histogram->setShuttingDown(true);
Expand Down Expand Up @@ -261,31 +276,50 @@ void ThreadLocalStoreImpl::releaseScopeCrossThread(ScopeImpl* scope) {
//
// Since this is called from ScopeImpl's destructor, we must bump the
// ref-count of the central-cache by copying to a local scoped pointer, and
// keep that reference alive until all the TLS caches are clear.
CentralCacheEntrySharedPtr central_cache = scope->central_cache_;
// keep that reference alive until all the TLS caches are clear. This is done by keeping a
// separate vector of shared_ptrs which will be destructed once all threads have completed.

// This can happen from any thread. We post() back to the main thread which will initiate the
// cache flush operation.
if (!shutting_down_ && main_thread_dispatcher_) {
const uint64_t scope_id = scope->scope_id_;
// Clear scopes in a batch. It's possible that many different scopes will be deleted at
// the same time, before the main thread gets a chance to run cleanScopesFromCaches. If a new
// scope is deleted before that post runs, we add it to our list of scopes to clear, and there
// is no need to issue another post. This greatly reduces the overhead when there are tens of
// thousands of scopes to clear in a short period. i.e.: VHDS updates with tens of thousands of
// VirtualHosts.
bool need_post = scopes_to_cleanup_.empty();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mind including an explanation why we post when this is empty? I assume this is the batching mechanism so that we don't post while there is a post in progress, but some detail would be nice to have here

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updating releaseScopesCrossThread to have some of the same info as releaseHistogramCrossThread. This should cover this and the additional comment below. :)

scopes_to_cleanup_.push_back(scope->scope_id_);
central_cache_entries_to_cleanup_.push_back(scope->central_cache_);
lock.release();

// TODO(jmarantz): consider batching all the scope IDs that should be
// cleared from TLS caches to reduce bursts of runOnAllThreads on a large
// config update. See the pattern below used for histograms.
main_thread_dispatcher_->post([this, central_cache, scope_id]() {
sync_.syncPoint(MainDispatcherCleanupSync);
clearScopeFromCaches(scope_id, central_cache);
});
if (need_post) {
main_thread_dispatcher_->post([this]() {
sync_.syncPoint(MainDispatcherCleanupSync);
clearScopesFromCaches();
});
}
}
}

void ThreadLocalStoreImpl::releaseHistogramCrossThread(uint64_t histogram_id) {
// This can happen from any thread. We post() back to the main thread which will initiate the
// cache flush operation.
if (!shutting_down_ && main_thread_dispatcher_) {
main_thread_dispatcher_->post(
[this, histogram_id]() { clearHistogramFromCaches(histogram_id); });
// It's possible that many different histograms will be deleted at the same
// time, before the main thread gets a chance to run
// clearHistogramsFromCaches. If a new histogram is deleted before that
// post runs, we add it to our list of histograms to clear, and there's no
// need to issue another post.
Comment on lines +311 to +313
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see this is explained here, would be good to have this explanation for the above scope release :)

bool need_post = false;
{
Thread::LockGuard lock(hist_mutex_);
need_post = histograms_to_cleanup_.empty();
histograms_to_cleanup_.push_back(histogram_id);
}
if (need_post) {
main_thread_dispatcher_->post([this]() { clearHistogramsFromCaches(); });
}
}
}

Expand All @@ -294,39 +328,62 @@ ThreadLocalStoreImpl::TlsCache::insertScope(uint64_t scope_id) {
return scope_cache_[scope_id];
}

void ThreadLocalStoreImpl::TlsCache::eraseScope(uint64_t scope_id) { scope_cache_.erase(scope_id); }
void ThreadLocalStoreImpl::TlsCache::eraseHistogram(uint64_t histogram_id) {
void ThreadLocalStoreImpl::TlsCache::eraseScopes(const std::vector<uint64_t>& scope_ids) {
for (uint64_t scope_id : scope_ids) {
scope_cache_.erase(scope_id);
}
}

void ThreadLocalStoreImpl::TlsCache::eraseHistograms(const std::vector<uint64_t>& histograms) {
// This is called for every histogram in every thread, even though the
// histogram may not have been cached in each thread yet. So we don't
// want to check whether the erase() call erased anything.
tls_histogram_cache_.erase(histogram_id);
for (uint64_t histogram_id : histograms) {
tls_histogram_cache_.erase(histogram_id);
}
}

void ThreadLocalStoreImpl::clearScopeFromCaches(uint64_t scope_id,
CentralCacheEntrySharedPtr central_cache) {
void ThreadLocalStoreImpl::clearScopesFromCaches() {
// If we are shutting down we no longer perform cache flushes as workers may be shutting down
// at the same time.
if (!shutting_down_) {
// Perform a cache flush on all threads.

// Capture all the pending scope ids in a local, clearing the list held in
// this. Once this occurs, if a new scope is deleted, a new post will be
// required.
auto scope_ids = std::make_shared<std::vector<uint64_t>>();
// Capture all the central cache entries for scopes we're deleting. These will be freed after
// all threads have completed.
auto central_caches = std::make_shared<std::vector<CentralCacheEntrySharedPtr>>();
{
Thread::LockGuard lock(lock_);
*scope_ids = std::move(scopes_to_cleanup_);
scopes_to_cleanup_.clear();
*central_caches = std::move(central_cache_entries_to_cleanup_);
central_cache_entries_to_cleanup_.clear();
}

tls_cache_->runOnAllThreads(
[scope_id](OptRef<TlsCache> tls_cache) { tls_cache->eraseScope(scope_id); },
[central_cache]() { /* Holds onto central_cache until all tls caches are clear */ });
[scope_ids](OptRef<TlsCache> tls_cache) { tls_cache->eraseScopes(*scope_ids); },
[central_caches]() { /* Holds onto central_caches until all tls caches are clear */ });
}
}

void ThreadLocalStoreImpl::clearHistogramFromCaches(uint64_t histogram_id) {
void ThreadLocalStoreImpl::clearHistogramsFromCaches() {
// If we are shutting down we no longer perform cache flushes as workers may be shutting down
// at the same time.
if (!shutting_down_) {
// Perform a cache flush on all threads.
//
// TODO(jmarantz): If this cross-thread posting proves to be a performance
// bottleneck,
// https://gist.github.com/jmarantz/838cb6de7e74c0970ea6b63eded0139a
// contains a patch that will implement batching together to clear multiple
// histograms.
// Move the histograms pending cleanup into a local variable. Future histogram deletions will be
// batched until the next time this function is called.
auto histograms = std::make_shared<std::vector<uint64_t>>();
{
Thread::LockGuard lock(hist_mutex_);
histograms->swap(histograms_to_cleanup_);
}

tls_cache_->runOnAllThreads(
[histogram_id](OptRef<TlsCache> tls_cache) { tls_cache->eraseHistogram(histogram_id); });
[histograms](OptRef<TlsCache> tls_cache) { tls_cache->eraseHistograms(*histograms); });
}
}

Expand Down
22 changes: 18 additions & 4 deletions source/common/stats/thread_local_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -445,8 +445,8 @@ class ThreadLocalStoreImpl : Logger::Loggable<Logger::Id::stats>, public StoreRo

struct TlsCache : public ThreadLocal::ThreadLocalObject {
TlsCacheEntry& insertScope(uint64_t scope_id);
void eraseScope(uint64_t scope_id);
void eraseHistogram(uint64_t histogram);
void eraseScopes(const std::vector<uint64_t>& scope_ids);
void eraseHistograms(const std::vector<uint64_t>& histograms);

// The TLS scope cache is keyed by scope ID. This is used to avoid complex circular references
// during scope destruction. An ID is required vs. using the address of the scope pointer
Expand All @@ -472,8 +472,8 @@ class ThreadLocalStoreImpl : Logger::Loggable<Logger::Id::stats>, public StoreRo
}

std::string getTagsForName(const std::string& name, TagVector& tags) const;
void clearScopeFromCaches(uint64_t scope_id, CentralCacheEntrySharedPtr central_cache);
void clearHistogramFromCaches(uint64_t histogram_id);
void clearScopesFromCaches();
void clearHistogramsFromCaches();
void releaseScopeCrossThread(ScopeImpl* scope);
void mergeInternal(PostMergeCb merge_cb);
bool rejects(StatName name) const;
Expand All @@ -499,6 +499,7 @@ class ThreadLocalStoreImpl : Logger::Loggable<Logger::Id::stats>, public StoreRo
std::atomic<bool> shutting_down_{};
std::atomic<bool> merge_in_progress_{};
AllocatorImpl heap_allocator_;
OptRef<ThreadLocal::Instance> tls_;

NullCounterImpl null_counter_;
NullGaugeImpl null_gauge_;
Expand Down Expand Up @@ -526,6 +527,19 @@ class ThreadLocalStoreImpl : Logger::Loggable<Logger::Id::stats>, public StoreRo
std::vector<GaugeSharedPtr> deleted_gauges_ ABSL_GUARDED_BY(lock_);
std::vector<HistogramSharedPtr> deleted_histograms_ ABSL_GUARDED_BY(lock_);
std::vector<TextReadoutSharedPtr> deleted_text_readouts_ ABSL_GUARDED_BY(lock_);

// Scope IDs and central cache entries that are queued for cross-scope release.
// Because there can be a large number of scopes, all of which are released at once,
// (e.g. when a scope is deleted), it is more efficient to batch their cleanup,
// which would otherwise entail a post() per scope per thread.
std::vector<uint64_t> scopes_to_cleanup_ ABSL_GUARDED_BY(lock_);
std::vector<CentralCacheEntrySharedPtr> central_cache_entries_to_cleanup_ ABSL_GUARDED_BY(lock_);

// Histograms IDs that are queued for cross-scope release. Because there
// can be a large number of histograms, all of which are released at once,
// (e.g. when a scope is deleted), it is likely more efficient to batch their
// cleanup, which would otherwise entail a post() per histogram per thread.
std::vector<uint64_t> histograms_to_cleanup_ ABSL_GUARDED_BY(hist_mutex_);
};

using ThreadLocalStoreImplPtr = std::unique_ptr<ThreadLocalStoreImpl>;
Expand Down
1 change: 1 addition & 0 deletions source/common/thread_local/thread_local_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class InstanceImpl : Logger::Loggable<Logger::Id::main>, public NonCopyable, pub
void shutdownGlobalThreading() override;
void shutdownThread() override;
Event::Dispatcher& dispatcher() override;
bool isShutdown() const override { return shutdown_; }

private:
// On destruction returns the slot index to the deferred delete queue (detaches it). This allows
Expand Down
9 changes: 4 additions & 5 deletions test/common/stats/thread_local_store_speed_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,16 @@ class ThreadLocalStorePerf {
store_.setTagProducer(std::make_unique<Stats::TagProducerImpl>(stats_config_));

Stats::TestUtil::forEachSampleStat(1000, [this](absl::string_view name) {
stat_names_.push_back(std::make_unique<Stats::StatNameStorage>(name, symbol_table_));
stat_names_.push_back(std::make_unique<Stats::StatNameManagedStorage>(name, symbol_table_));
});
}

~ThreadLocalStorePerf() {
for (auto& stat_name_storage : stat_names_) {
stat_name_storage->free(symbol_table_);
if (tls_) {
tls_->shutdownGlobalThreading();
}
store_.shutdownThreading();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the purpose of this change?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In production code we were shutting down threading this way, where ThraedLocal::shutdownGlobalThreading() was called first. That means in shutdownThreading() we can't then go through the runOAllThreads sequence to clean up; we just have to remove the elements directly.

This change, and others in some tests, are to make tests shutdown threading in the same order as production code. The ASSERT(tls_->isShutdown()), validates the claim made below it:

// We can't call runOnAllThreads here as global threading has already been shutdown.

and a lot of the trivial test changes are to avoid having that assert crash in unit tests.

if (tls_) {
tls_->shutdownGlobalThreading();
tls_->shutdownThread();
}
if (dispatcher_) {
Expand Down Expand Up @@ -72,7 +71,7 @@ class ThreadLocalStorePerf {
Stats::ThreadLocalStoreImpl store_;
Api::ApiPtr api_;
envoy::config::metrics::v3::StatsConfig stats_config_;
std::vector<std::unique_ptr<Stats::StatNameStorage>> stat_names_;
std::vector<std::unique_ptr<Stats::StatNameManagedStorage>> stat_names_;
};

} // namespace Envoy
Expand Down
Loading