Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 80 additions & 28 deletions source/common/stats/thread_local_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ ThreadLocalStoreImpl::~ThreadLocalStoreImpl() {
ASSERT(shutting_down_ || !threading_ever_initialized_);
default_scope_.reset();
ASSERT(scopes_.empty());
ASSERT(scopes_to_cleanup_.empty());
Comment thread
jmarantz marked this conversation as resolved.
ASSERT(central_cache_entries_to_cleanup_.empty());
ASSERT(histograms_to_cleanup_.empty());
}

void ThreadLocalStoreImpl::setHistogramSettings(HistogramSettingsConstPtr&& histogram_settings) {
Expand Down Expand Up @@ -199,6 +202,15 @@ void ThreadLocalStoreImpl::initializeThreading(Event::Dispatcher& main_thread_di
void ThreadLocalStoreImpl::shutdownThreading() {
// This will block both future cache fills as well as cache flushes.
shutting_down_ = true;

// We can't call runOnAllThreads here as global threading has already been shutdown.
Comment thread
jmarantz marked this conversation as resolved.
Outdated
// It is okay to simply clear the scopes and central cache entries to cleanup.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you include an explanation for why this is okay to do?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will update. How is:
// We can't call runOnAllThreads here as global threading has already been shutdown. It is okay // to simply clear the scopes and central cache entries here as they will be cleaned up during thread local data // cleanup in InstanceImpl::shutdownThread().

{
Thread::LockGuard lock(lock_);
scopes_to_cleanup_.clear();
central_cache_entries_to_cleanup_.clear();
}

Thread::LockGuard lock(hist_mutex_);
for (ParentHistogramImpl* histogram : histogram_set_) {
histogram->setShuttingDown(true);
Expand Down Expand Up @@ -261,31 +273,47 @@ void ThreadLocalStoreImpl::releaseScopeCrossThread(ScopeImpl* scope) {
//
// Since this is called from ScopeImpl's destructor, we must bump the
// ref-count of the central-cache by copying to a local scoped pointer, and
// keep that reference alive until all the TLS caches are clear.
CentralCacheEntrySharedPtr central_cache = scope->central_cache_;
// keep that reference alive until all the TLS caches are clear. This is done by keeping a
// separate vector of shared_ptrs which will be destructed once all threads have completed.

// This can happen from any thread. We post() back to the main thread which will initiate the
// cache flush operation.
if (!shutting_down_ && main_thread_dispatcher_) {
const uint64_t scope_id = scope->scope_id_;
// Switch to batching of clearing scopes. This greatly reduces the overhead when there are

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: s/Switch to batching of clearing scopes/Clear scopes in a batch/ as the reader of this won't be anchoring on the previous implementation.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will change

// tens of thousands of scopes to clear in a short period. i.e.: VHDS updates with tens of
// thousands of VirtualHosts

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: end sentences with period.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix

bool need_post = scopes_to_cleanup_.empty();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mind including an explanation why we post when this is empty? I assume this is the batching mechanism so that we don't post while there is a post in progress, but some detail would be nice to have here

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updating releaseScopesCrossThread to have some of the same info as releaseHistogramCrossThread. This should cover this and the additional comment below. :)

scopes_to_cleanup_.push_back(scope->scope_id_);
central_cache_entries_to_cleanup_.push_back(scope->central_cache_);
lock.release();

// TODO(jmarantz): consider batching all the scope IDs that should be
// cleared from TLS caches to reduce bursts of runOnAllThreads on a large
// config update. See the pattern below used for histograms.
main_thread_dispatcher_->post([this, central_cache, scope_id]() {
sync_.syncPoint(MainDispatcherCleanupSync);
clearScopeFromCaches(scope_id, central_cache);
});
if (need_post) {
main_thread_dispatcher_->post([this]() {
sync_.syncPoint(MainDispatcherCleanupSync);
clearScopesFromCaches();
});
}
}
}

void ThreadLocalStoreImpl::releaseHistogramCrossThread(uint64_t histogram_id) {
// This can happen from any thread. We post() back to the main thread which will initiate the
// cache flush operation.
if (!shutting_down_ && main_thread_dispatcher_) {
main_thread_dispatcher_->post(
[this, histogram_id]() { clearHistogramFromCaches(histogram_id); });
// It's possible that many different histograms will be deleted at the same
// time, before the main thread gets a chance to run
// clearHistogramsFromCaches. If a new histogram is deleted before that
// post runs, we add it to our list of histograms to clear, and there's no
// need to issue another post.
Comment on lines +311 to +313

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see this is explained here, would be good to have this explanation for the above scope release :)

bool need_post = false;
{
Thread::LockGuard lock(hist_mutex_);
need_post = histograms_to_cleanup_.empty();
histograms_to_cleanup_.push_back(histogram_id);
}
if (need_post) {
main_thread_dispatcher_->post([this]() { clearHistogramsFromCaches(); });
}
}
}

Expand All @@ -294,39 +322,63 @@ ThreadLocalStoreImpl::TlsCache::insertScope(uint64_t scope_id) {
return scope_cache_[scope_id];
}

void ThreadLocalStoreImpl::TlsCache::eraseScope(uint64_t scope_id) { scope_cache_.erase(scope_id); }
void ThreadLocalStoreImpl::TlsCache::eraseHistogram(uint64_t histogram_id) {
void ThreadLocalStoreImpl::TlsCache::eraseScopes(const std::vector<uint64_t>& scope_ids) {
for (uint64_t scope_id : scope_ids) {
scope_cache_.erase(scope_id);
}
}

void ThreadLocalStoreImpl::TlsCache::eraseHistograms(const std::vector<uint64_t>& histograms) {
// This is called for every histogram in every thread, even though the
// histogram may not have been cached in each thread yet. So we don't
// want to check whether the erase() call erased anything.
tls_histogram_cache_.erase(histogram_id);
for (uint64_t histogram_id : histograms) {
tls_histogram_cache_.erase(histogram_id);
}
}

void ThreadLocalStoreImpl::clearScopeFromCaches(uint64_t scope_id,
CentralCacheEntrySharedPtr central_cache) {
void ThreadLocalStoreImpl::clearScopesFromCaches() {
// If we are shutting down we no longer perform cache flushes as workers may be shutting down
// at the same time.
if (!shutting_down_) {
// Perform a cache flush on all threads.

// Capture all the pending scope ids in a local, clearing the list held in
// this. Once this occurs, if a new scope is deleted, a new post will be
// required.
auto scope_ids = std::make_shared<std::vector<uint64_t>>();
// Capture all the central cache entries for scopes we're deleting. These will be freed after
// all threads have completed.
auto central_caches = std::make_shared<std::vector<CentralCacheEntrySharedPtr>>();
{
Thread::LockGuard lock(lock_);
*scope_ids = std::move(scopes_to_cleanup_);
scopes_to_cleanup_.clear();
*central_caches = std::move(central_cache_entries_to_cleanup_);
central_cache_entries_to_cleanup_.clear();
}

tls_cache_->runOnAllThreads(
[scope_id](OptRef<TlsCache> tls_cache) { tls_cache->eraseScope(scope_id); },
[central_cache]() { /* Holds onto central_cache until all tls caches are clear */ });
[scope_ids](OptRef<TlsCache> tls_cache) { tls_cache->eraseScopes(*scope_ids); },
[central_caches]() { /* Holds onto central_caches until all tls caches are clear */ });
}
}

void ThreadLocalStoreImpl::clearHistogramFromCaches(uint64_t histogram_id) {
void ThreadLocalStoreImpl::clearHistogramsFromCaches() {
// If we are shutting down we no longer perform cache flushes as workers may be shutting down
// at the same time.
if (!shutting_down_) {
// Perform a cache flush on all threads.
//
// TODO(jmarantz): If this cross-thread posting proves to be a performance
// bottleneck,
// https://gist.github.com/jmarantz/838cb6de7e74c0970ea6b63eded0139a
// contains a patch that will implement batching together to clear multiple
// histograms.
// Capture all the pending histograms in a local, clearing the list held in
// this. Once this occurs, if a new histogram is deleted, a new post will be

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is a bit hard to read because talking about this makes the sentence seem incomplete. Maybe rephrase? Something like Move the histograms pending cleanup into a local variable. Future histogram deletions will be batched until the next time this function is called.?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will reword

// required.
auto histograms = std::make_shared<std::vector<uint64_t>>();
{
Thread::LockGuard lock(hist_mutex_);
histograms->swap(histograms_to_cleanup_);
}

tls_cache_->runOnAllThreads(
[histogram_id](OptRef<TlsCache> tls_cache) { tls_cache->eraseHistogram(histogram_id); });
[histograms](OptRef<TlsCache> tls_cache) { tls_cache->eraseHistograms(*histograms); });
}
}

Expand Down
21 changes: 17 additions & 4 deletions source/common/stats/thread_local_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -445,8 +445,8 @@ class ThreadLocalStoreImpl : Logger::Loggable<Logger::Id::stats>, public StoreRo

struct TlsCache : public ThreadLocal::ThreadLocalObject {
TlsCacheEntry& insertScope(uint64_t scope_id);
void eraseScope(uint64_t scope_id);
void eraseHistogram(uint64_t histogram);
void eraseScopes(const std::vector<uint64_t>& scope_ids);
void eraseHistograms(const std::vector<uint64_t>& histograms);

// The TLS scope cache is keyed by scope ID. This is used to avoid complex circular references
// during scope destruction. An ID is required vs. using the address of the scope pointer
Expand All @@ -472,8 +472,8 @@ class ThreadLocalStoreImpl : Logger::Loggable<Logger::Id::stats>, public StoreRo
}

std::string getTagsForName(const std::string& name, TagVector& tags) const;
void clearScopeFromCaches(uint64_t scope_id, CentralCacheEntrySharedPtr central_cache);
void clearHistogramFromCaches(uint64_t histogram_id);
void clearScopesFromCaches();
void clearHistogramsFromCaches();
void releaseScopeCrossThread(ScopeImpl* scope);
void mergeInternal(PostMergeCb merge_cb);
bool rejects(StatName name) const;
Expand Down Expand Up @@ -526,6 +526,19 @@ class ThreadLocalStoreImpl : Logger::Loggable<Logger::Id::stats>, public StoreRo
std::vector<GaugeSharedPtr> deleted_gauges_ ABSL_GUARDED_BY(lock_);
std::vector<HistogramSharedPtr> deleted_histograms_ ABSL_GUARDED_BY(lock_);
std::vector<TextReadoutSharedPtr> deleted_text_readouts_ ABSL_GUARDED_BY(lock_);

// Scope IDs and central cache entries that are queued for cross-scope release.
// Because there can be a large number of scopes, all of which are released at once,
// (e.g. when a scope is deleted), it is more efficient to batch their cleanup,
// which would otherwise entail a post() per scope per thread.
std::vector<uint64_t> scopes_to_cleanup_ ABSL_GUARDED_BY(lock_);
std::vector<CentralCacheEntrySharedPtr> central_cache_entries_to_cleanup_ ABSL_GUARDED_BY(lock_);

// Histograms IDs that are queued for cross-scope release. Because there
// can be a large number of histograms, all of which are released at once,
// (e.g. when a scope is deleted), it is likely more efficient to batch their
// cleanup, which would otherwise entail a post() per histogram per thread.
std::vector<uint64_t> histograms_to_cleanup_ ABSL_GUARDED_BY(hist_mutex_);
};

using ThreadLocalStoreImplPtr = std::unique_ptr<ThreadLocalStoreImpl>;
Expand Down
20 changes: 17 additions & 3 deletions test/common/stats/thread_local_store_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ class HistogramTest : public testing::Test {
}

void TearDown() override {
tls_.shutdownGlobalThreading();
Comment thread
jmarantz marked this conversation as resolved.
store_->shutdownThreading();
tls_.shutdownThread();
}
Expand Down Expand Up @@ -318,6 +319,7 @@ TEST_F(StatsThreadLocalStoreTest, Tls) {
EXPECT_EQ(&t1, store_->textReadouts().front().get()); // front() ok when size()==1
EXPECT_EQ(2UL, store_->textReadouts().front().use_count());

tls_.shutdownGlobalThreading();
store_->shutdownThreading();
tls_.shutdownThread();

Expand Down Expand Up @@ -415,6 +417,7 @@ TEST_F(StatsThreadLocalStoreTest, BasicScope) {
Stats::Histogram::Unit::Unspecified));
}

tls_.shutdownGlobalThreading();
store_->shutdownThreading();
scope1->deliverHistogramToSinks(h1, 100);
scope1->deliverHistogramToSinks(h2, 200);
Expand Down Expand Up @@ -460,6 +463,7 @@ TEST_F(StatsThreadLocalStoreTest, HistogramScopeOverlap) {
EXPECT_EQ(0, store_->histograms().size());
EXPECT_EQ(0, numTlsHistograms());

tls_.shutdownGlobalThreading();
store_->shutdownThreading();

store_->histogramFromString("histogram_after_shutdown", Histogram::Unit::Unspecified);
Expand All @@ -476,6 +480,7 @@ TEST_F(StatsThreadLocalStoreTest, SanitizePrefix) {
Counter& c1 = scope1->counterFromString("c1");
EXPECT_EQ("scope1___foo.c1", c1.name());

tls_.shutdownGlobalThreading();
store_->shutdownThreading();
tls_.shutdownThread();
}
Expand All @@ -499,13 +504,14 @@ TEST_F(StatsThreadLocalStoreTest, ScopeDelete) {
EXPECT_EQ("scope1.c1", c1->name());

EXPECT_CALL(main_thread_dispatcher_, post(_));
EXPECT_CALL(tls_, runOnAllThreads(_, _));
EXPECT_CALL(tls_, runOnAllThreads(_, _)).Times(testing::AtLeast(1));
scope1.reset();
EXPECT_EQ(0UL, store_->counters().size());

EXPECT_EQ(1L, c1.use_count());
c1.reset();

tls_.shutdownGlobalThreading();
store_->shutdownThreading();
tls_.shutdownThread();
}
Expand Down Expand Up @@ -541,6 +547,7 @@ TEST_F(StatsThreadLocalStoreTest, NestedScopes) {
TextReadout& t1 = scope2->textReadoutFromString("some_string");
EXPECT_EQ("scope1.foo.some_string", t1.name());

tls_.shutdownGlobalThreading();
store_->shutdownThreading();
tls_.shutdownThread();
}
Expand Down Expand Up @@ -605,6 +612,7 @@ TEST_F(StatsThreadLocalStoreTest, OverlappingScopes) {
EXPECT_EQ("abc", t2.value());
EXPECT_EQ(1UL, store_->textReadouts().size());

tls_.shutdownGlobalThreading();
store_->shutdownThreading();
tls_.shutdownThread();
}
Expand Down Expand Up @@ -650,6 +658,7 @@ TEST_F(StatsThreadLocalStoreTest, TextReadoutAllLengths) {
t.set("");
EXPECT_EQ("", t.value());

tls_.shutdownGlobalThreading();
store_->shutdownThreading();
tls_.shutdownThread();
}
Expand Down Expand Up @@ -938,6 +947,7 @@ class RememberStatsMatcherTest : public testing::TestWithParam<bool> {
}

~RememberStatsMatcherTest() override {
tls_.shutdownGlobalThreading();
store_.shutdownThreading();
tls_.shutdownThread();
}
Expand Down Expand Up @@ -1111,6 +1121,7 @@ TEST_F(StatsThreadLocalStoreTest, RemoveRejectedStats) {
EXPECT_CALL(sink_, onHistogramComplete(Ref(histogram), 42));
histogram.recordValue(42);
textReadout.set("fortytwo");
tls_.shutdownGlobalThreading();
store_->shutdownThreading();
tls_.shutdownThread();
}
Expand All @@ -1127,6 +1138,7 @@ TEST_F(StatsThreadLocalStoreTest, NonHotRestartNoTruncation) {
// This works fine, and we can find it by its long name because heap-stats do not
// get truncated.
EXPECT_NE(nullptr, TestUtility::findCounter(*store_, name_1).get());
tls_.shutdownGlobalThreading();
store_->shutdownThreading();
tls_.shutdownThread();
}
Expand All @@ -1143,6 +1155,7 @@ class StatsThreadLocalStoreTestNoFixture : public testing::Test {

~StatsThreadLocalStoreTestNoFixture() override {
if (threading_enabled_) {
tls_.shutdownGlobalThreading();
store_.shutdownThreading();
tls_.shutdownThread();
}
Expand Down Expand Up @@ -1221,7 +1234,6 @@ TEST_F(StatsThreadLocalStoreTest, MergeDuringShutDown) {

EXPECT_CALL(sink_, onHistogramComplete(Ref(h1), 1));
h1.recordValue(1);

store_->shutdownThreading();

// Validate that merge callback is called during shutdown and there is no ASSERT.
Expand All @@ -1243,7 +1255,9 @@ TEST(ThreadLocalStoreThreadTest, ConstructDestruct) {

store.initializeThreading(*dispatcher, tls);
{ ScopePtr scope1 = store.createScope("scope1."); }
tls.shutdownGlobalThreading();
store.shutdownThreading();
tls.shutdownThread();
}

// Histogram tests
Expand Down Expand Up @@ -1505,8 +1519,8 @@ class ThreadLocalRealThreadsTestBase : public ThreadLocalStoreNoMocksTestBase {
{
BlockingBarrier blocking_barrier(1);
main_dispatcher_->post(blocking_barrier.run([this]() {
store_->shutdownThreading();
tls_->shutdownGlobalThreading();
store_->shutdownThreading();
tls_->shutdownThread();
}));
}
Expand Down