Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 112 additions & 40 deletions source/common/stats/thread_local_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
namespace Envoy {
namespace Stats {

const char ThreadLocalStoreImpl::DeleteScopeSync[] = "delete-scope";
const char ThreadLocalStoreImpl::IterateScopeSync[] = "iterate-scope";
const char ThreadLocalStoreImpl::MainDispatcherCleanupSync[] = "main-dispatcher-cleanup";

ThreadLocalStoreImpl::ThreadLocalStoreImpl(Allocator& alloc)
Expand All @@ -48,10 +50,10 @@ ThreadLocalStoreImpl::~ThreadLocalStoreImpl() {
}

void ThreadLocalStoreImpl::setHistogramSettings(HistogramSettingsConstPtr&& histogram_settings) {
Thread::LockGuard lock(lock_);
for (ScopeImpl* scope : scopes_) {
ASSERT(scope->central_cache_->histograms_.empty());
}
iterateScopes([](const ScopeImplSharedPtr& scope) -> bool {
ASSERT(scope->centralCacheLockHeld()->histograms_.empty());
return true;
});
histogram_settings_ = std::move(histogram_settings);
}

Expand All @@ -67,21 +69,23 @@ void ThreadLocalStoreImpl::setStatsMatcher(StatsMatcherPtr&& stats_matcher) {
// be no copies in TLS caches.
Thread::LockGuard lock(lock_);
const uint32_t first_histogram_index = deleted_histograms_.size();
for (ScopeImpl* scope : scopes_) {
removeRejectedStats<CounterSharedPtr>(scope->central_cache_->counters_,
iterateScopesLockHeld([this](const ScopeImplSharedPtr& scope) ABSL_EXCLUSIVE_LOCKS_REQUIRED(
lock_) -> bool {
const CentralCacheEntrySharedPtr& central_cache = scope->centralCacheLockHeld();
removeRejectedStats<CounterSharedPtr>(central_cache->counters_,
[this](const CounterSharedPtr& counter) mutable {
alloc_.markCounterForDeletion(counter);
});
removeRejectedStats<GaugeSharedPtr>(
scope->central_cache_->gauges_,
central_cache->gauges_,
[this](const GaugeSharedPtr& gauge) mutable { alloc_.markGaugeForDeletion(gauge); });
removeRejectedStats(scope->central_cache_->histograms_, deleted_histograms_);
removeRejectedStats(central_cache->histograms_, deleted_histograms_);
removeRejectedStats<TextReadoutSharedPtr>(
scope->central_cache_->text_readouts_,
[this](const TextReadoutSharedPtr& text_readout) mutable {
central_cache->text_readouts_, [this](const TextReadoutSharedPtr& text_readout) mutable {
alloc_.markTextReadoutForDeletion(text_readout);
});
}
return true;
});

// Remove any newly rejected histograms from histogram_set_.
{
Expand Down Expand Up @@ -151,7 +155,7 @@ ScopeSharedPtr ThreadLocalStoreImpl::createScope(const std::string& name) {
ScopeSharedPtr ThreadLocalStoreImpl::scopeFromStatName(StatName name) {
auto new_scope = std::make_shared<ScopeImpl>(*this, name);
Thread::LockGuard lock(lock_);
scopes_.emplace(new_scope.get());
scopes_[new_scope.get()] = std::weak_ptr<ScopeImpl>(new_scope);
return new_scope;
}

Expand Down Expand Up @@ -283,7 +287,7 @@ void ThreadLocalStoreImpl::releaseScopeCrossThread(ScopeImpl* scope) {
// VirtualHosts.
bool need_post = scopes_to_cleanup_.empty();
scopes_to_cleanup_.push_back(scope->scope_id_);
central_cache_entries_to_cleanup_.push_back(scope->central_cache_);
central_cache_entries_to_cleanup_.push_back(scope->centralCacheLockHeld());
lock.release();

if (need_post) {
Expand Down Expand Up @@ -386,6 +390,11 @@ ThreadLocalStoreImpl::ScopeImpl::ScopeImpl(ThreadLocalStoreImpl& parent, StatNam
central_cache_(new CentralCacheEntry(parent.alloc_.symbolTable())) {}

ThreadLocalStoreImpl::ScopeImpl::~ScopeImpl() {
// Helps reproduce a previous race condition by pausing here in tests while we
// loop over scopes. 'this' will not have been removed from the scopes_ table
// yet, so we need to be careful.
parent_.sync_.syncPoint(DeleteScopeSync);

// Note that scope iteration is thread-safe due to the lock held in
// releaseScopeCrossThread. For more details see the comment in
// `ThreadLocalStoreImpl::iterHelper`, and the lock it takes prior to the loop.
Expand Down Expand Up @@ -460,6 +469,44 @@ bool ThreadLocalStoreImpl::checkAndRememberRejection(StatName name,
return false;
}

CounterOptConstRef ThreadLocalStoreImpl::findCounter(StatName name) const {
CounterOptConstRef found_counter;
iterateScopes([&found_counter, name](const ScopeImplSharedPtr& scope) -> bool {
found_counter =
scope->findStatLockHeld<Counter>(name, scope->centralCacheLockHeld()->counters_);
return !found_counter.has_value();
});
return found_counter;
}

GaugeOptConstRef ThreadLocalStoreImpl::findGauge(StatName name) const {
GaugeOptConstRef found_gauge;
iterateScopes([&found_gauge, name](const ScopeImplSharedPtr& scope) -> bool {
found_gauge = scope->findStatLockHeld<Gauge>(name, scope->centralCacheLockHeld()->gauges_);
return !found_gauge.has_value();
});
return found_gauge;
}

HistogramOptConstRef ThreadLocalStoreImpl::findHistogram(StatName name) const {
HistogramOptConstRef found_histogram;
iterateScopes([&found_histogram, name](const ScopeImplSharedPtr& scope) -> bool {
found_histogram = scope->findHistogramLockHeld(name);
return !found_histogram.has_value();
});
return found_histogram;
}

TextReadoutOptConstRef ThreadLocalStoreImpl::findTextReadout(StatName name) const {
TextReadoutOptConstRef found_text_readout;
iterateScopes([&found_text_readout, name](const ScopeImplSharedPtr& scope) -> bool {
found_text_readout =
scope->findStatLockHeld<TextReadout>(name, scope->centralCacheLockHeld()->text_readouts_);
return !found_text_readout.has_value();
});
return found_text_readout;
}

template <class StatType>
StatType& ThreadLocalStoreImpl::ScopeImpl::safeMakeStat(
StatName full_stat_name, StatName name_no_tags,
Expand Down Expand Up @@ -512,20 +559,6 @@ StatType& ThreadLocalStoreImpl::ScopeImpl::safeMakeStat(
return ret;
}

template <class StatType>
using StatTypeOptConstRef = absl::optional<std::reference_wrapper<const StatType>>;

template <class StatType>
StatTypeOptConstRef<StatType> ThreadLocalStoreImpl::ScopeImpl::findStatLockHeld(
StatName name, StatNameHashMap<RefcountPtr<StatType>>& central_cache_map) const {
auto iter = central_cache_map.find(name);
if (iter == central_cache_map.end()) {
return absl::nullopt;
}

return std::cref(*iter->second);
}

Counter& ThreadLocalStoreImpl::ScopeImpl::counterFromStatNameWithTags(
const StatName& name, StatNameTagVectorOptConstRef stat_name_tags) {
if (parent_.rejectsAll()) {
Expand All @@ -551,9 +584,10 @@ Counter& ThreadLocalStoreImpl::ScopeImpl::counterFromStatNameWithTags(
tls_rejected_stats = &entry.rejected_stats_;
}

const CentralCacheEntrySharedPtr& central_cache = centralCacheNoThreadAnalysis();
return safeMakeStat<Counter>(
final_stat_name, joiner.tagExtractedName(), stat_name_tags, central_cache_->counters_,
fast_reject_result, central_cache_->rejected_stats_,
final_stat_name, joiner.tagExtractedName(), stat_name_tags, central_cache->counters_,
fast_reject_result, central_cache->rejected_stats_,
[](Allocator& allocator, StatName name, StatName tag_extracted_name,
const StatNameTagVector& tags) -> CounterSharedPtr {
return allocator.makeCounter(name, tag_extracted_name, tags);
Expand Down Expand Up @@ -602,9 +636,10 @@ Gauge& ThreadLocalStoreImpl::ScopeImpl::gaugeFromStatNameWithTags(
tls_rejected_stats = &entry.rejected_stats_;
}

const CentralCacheEntrySharedPtr& central_cache = centralCacheNoThreadAnalysis();
Gauge& gauge = safeMakeStat<Gauge>(
final_stat_name, joiner.tagExtractedName(), stat_name_tags, central_cache_->gauges_,
fast_reject_result, central_cache_->rejected_stats_,
final_stat_name, joiner.tagExtractedName(), stat_name_tags, central_cache->gauges_,
fast_reject_result, central_cache->rejected_stats_,
[import_mode](Allocator& allocator, StatName name, StatName tag_extracted_name,
const StatNameTagVector& tags) -> GaugeSharedPtr {
return allocator.makeGauge(name, tag_extracted_name, tags, import_mode);
Expand All @@ -616,6 +651,8 @@ Gauge& ThreadLocalStoreImpl::ScopeImpl::gaugeFromStatNameWithTags(

Histogram& ThreadLocalStoreImpl::ScopeImpl::histogramFromStatNameWithTags(
const StatName& name, StatNameTagVectorOptConstRef stat_name_tags, Histogram::Unit unit) {
// See safety analysis comment in counterFromStatNameWithTags above.

if (parent_.rejectsAll()) {
return parent_.null_histogram_;
}
Expand Down Expand Up @@ -646,12 +683,13 @@ Histogram& ThreadLocalStoreImpl::ScopeImpl::histogramFromStatNameWithTags(
}

Thread::LockGuard lock(parent_.lock_);
auto iter = central_cache_->histograms_.find(final_stat_name);
const CentralCacheEntrySharedPtr& central_cache = centralCacheNoThreadAnalysis();
auto iter = central_cache->histograms_.find(final_stat_name);
ParentHistogramImplSharedPtr* central_ref = nullptr;
if (iter != central_cache_->histograms_.end()) {
if (iter != central_cache->histograms_.end()) {
central_ref = &iter->second;
} else if (parent_.checkAndRememberRejection(final_stat_name, fast_reject_result,
central_cache_->rejected_stats_,
central_cache->rejected_stats_,
tls_rejected_stats)) {
return parent_.null_histogram_;
} else {
Expand All @@ -676,7 +714,7 @@ Histogram& ThreadLocalStoreImpl::ScopeImpl::histogramFromStatNameWithTags(
}
}

central_ref = &central_cache_->histograms_[stat->statName()];
central_ref = &central_cache->histograms_[stat->statName()];
*central_ref = stat;
}

Expand Down Expand Up @@ -711,9 +749,10 @@ TextReadout& ThreadLocalStoreImpl::ScopeImpl::textReadoutFromStatNameWithTags(
tls_rejected_stats = &entry.rejected_stats_;
}

const CentralCacheEntrySharedPtr& central_cache = centralCacheNoThreadAnalysis();
return safeMakeStat<TextReadout>(
final_stat_name, joiner.tagExtractedName(), stat_name_tags, central_cache_->text_readouts_,
fast_reject_result, central_cache_->rejected_stats_,
final_stat_name, joiner.tagExtractedName(), stat_name_tags, central_cache->text_readouts_,
fast_reject_result, central_cache->rejected_stats_,
[](Allocator& allocator, StatName name, StatName tag_extracted_name,
const StatNameTagVector& tags) -> TextReadoutSharedPtr {
return allocator.makeTextReadout(name, tag_extracted_name, tags);
Expand All @@ -722,14 +761,22 @@ TextReadout& ThreadLocalStoreImpl::ScopeImpl::textReadoutFromStatNameWithTags(
}

CounterOptConstRef ThreadLocalStoreImpl::ScopeImpl::findCounter(StatName name) const {
Thread::LockGuard lock(parent_.lock_);
return findStatLockHeld<Counter>(name, central_cache_->counters_);
}

GaugeOptConstRef ThreadLocalStoreImpl::ScopeImpl::findGauge(StatName name) const {
Thread::LockGuard lock(parent_.lock_);
return findStatLockHeld<Gauge>(name, central_cache_->gauges_);
}

HistogramOptConstRef ThreadLocalStoreImpl::ScopeImpl::findHistogram(StatName name) const {
Thread::LockGuard lock(parent_.lock_);
return findHistogramLockHeld(name);
}

HistogramOptConstRef ThreadLocalStoreImpl::ScopeImpl::findHistogramLockHeld(StatName name) const
ABSL_EXCLUSIVE_LOCKS_REQUIRED(parent_.lock_) {
auto iter = central_cache_->histograms_.find(name);
if (iter == central_cache_->histograms_.end()) {
return absl::nullopt;
Expand All @@ -740,6 +787,7 @@ HistogramOptConstRef ThreadLocalStoreImpl::ScopeImpl::findHistogram(StatName nam
}

TextReadoutOptConstRef ThreadLocalStoreImpl::ScopeImpl::findTextReadout(StatName name) const {
Thread::LockGuard lock(parent_.lock_);
return findStatLockHeld<TextReadout>(name, central_cache_->text_readouts_);
}

Expand Down Expand Up @@ -972,15 +1020,39 @@ void ThreadLocalStoreImpl::forEachHistogram(SizeFn f_size, StatFn<ParentHistogra

void ThreadLocalStoreImpl::forEachScope(std::function<void(std::size_t)> f_size,
StatFn<const Scope> f_scope) const {
Thread::LockGuard lock(lock_);
std::vector<ScopeSharedPtr> scopes;
iterateScopes([&scopes](const ScopeImplSharedPtr& scope) -> bool {
scopes.push_back(scope);
return true;
});

if (f_size != nullptr) {
f_size(scopes_.size());
f_size(scopes.size());
}
for (ScopeImpl* scope : scopes_) {
for (const ScopeSharedPtr& scope : scopes) {
f_scope(*scope);
}
}

bool ThreadLocalStoreImpl::iterateScopesLockHeld(
const std::function<bool(const ScopeImplSharedPtr&)> fn) const
ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) {
for (auto& iter : scopes_) {
sync_.syncPoint(ThreadLocalStoreImpl::IterateScopeSync);

// We keep the scopes as a map from Scope* to weak_ptr<Scope> so that if,
// during the iteration, the last reference to a ScopeSharedPtr is dropped,
// we can test for that here by attempting to lock the weak pointer, and
// skip those that are nullptr.
const std::weak_ptr<ScopeImpl>& scope = iter.second;
const ScopeImplSharedPtr& locked = scope.lock();
if (locked != nullptr && !fn(locked)) {
return false;
}
}
return true;
}

void ThreadLocalStoreImpl::forEachSinkedCounter(SizeFn f_size, StatFn<Counter> f_stat) const {
alloc_.forEachSinkedCounter(f_size, f_stat);
}
Expand Down
Loading