diff --git a/envoy/server/instance.h b/envoy/server/instance.h index 880aa4889b376..f779abb42a173 100644 --- a/envoy/server/instance.h +++ b/envoy/server/instance.h @@ -2,6 +2,7 @@ #include #include +#include #include #include "envoy/access_log/access_log.h" @@ -31,6 +32,11 @@ #include "envoy/upstream/cluster_manager.h" namespace Envoy { + +namespace Stats { +class SinkPredicates; +} + namespace Server { /** @@ -269,6 +275,12 @@ class Instance { * TODO(mattklein123): This can be removed when version 1.20.0 is no longer supported. */ virtual bool enableReusePortDefault() PURE; + + /** + * Set predicates for filtering counters, gauges and text readouts to be flushed to sinks. + */ + virtual void + setSinkPredicates(std::unique_ptr&& sink_predicates) PURE; }; } // namespace Server diff --git a/envoy/stats/allocator.h b/envoy/stats/allocator.h index 2924ebf0ab303..223b1cab47068 100644 --- a/envoy/stats/allocator.h +++ b/envoy/stats/allocator.h @@ -18,6 +18,9 @@ namespace Envoy { namespace Stats { +class Sink; +class SinkPredicates; + /** * Abstract interface for allocating statistics. Implementations can * be created utilizing a single fixed-size block suitable for @@ -70,19 +73,32 @@ class Allocator { virtual void markTextReadoutForDeletion(const TextReadoutSharedPtr& text_readout) PURE; /** - * Iterate over all stats that need to be added to a sink. Note, that implementations can + * Iterate over all stats. Note, that implementations can potentially hold on to a mutex that + * will deadlock if the passed in functors try to create or delete a stat. + * @param f_size functor that is provided the current number of all stats. Note that this is + * called only once, prior to any calls to f_stat. + * @param f_stat functor that is provided one stat at a time from the stats container. + */ + virtual void forEachCounter(SizeFn f_size, StatFn f_stat) const PURE; + virtual void forEachGauge(SizeFn f_size, StatFn f_stat) const PURE; + virtual void forEachTextReadout(SizeFn f_size, StatFn f_stat) const PURE; + + /** + * Iterate over all stats that need to be flushed to sinks. Note, that implementations can * potentially hold on to a mutex that will deadlock if the passed in functors try to create * or delete a stat. - * @param f_size functor that is provided the number of all stats in the sink. Note this is - * called only once, prior to any calls to f_stat. - * @param f_stat functor that is provided one stat in the sink at a time. + * @param f_size functor that is provided the number of all stats that will be flushed to sinks. + * Note that this is called only once, prior to any calls to f_stat. + * @param f_stat functor that is provided one stat that will be flushed to sinks, at a time. + */ + virtual void forEachSinkedCounter(SizeFn f_size, StatFn f_stat) const PURE; + virtual void forEachSinkedGauge(SizeFn f_size, StatFn f_stat) const PURE; + virtual void forEachSinkedTextReadout(SizeFn f_size, StatFn f_stat) const PURE; + + /** + * Set the predicates to filter counters, gauges and text readouts for sink. */ - virtual void forEachCounter(std::function f_size, - std::function f_stat) const PURE; - virtual void forEachGauge(std::function f_size, - std::function f_stat) const PURE; - virtual void forEachTextReadout(std::function f_size, - std::function f_stat) const PURE; + virtual void setSinkPredicates(std::unique_ptr&& sink_predicates) PURE; // TODO(jmarantz): create a parallel mechanism to instantiate histograms. At // the moment, histograms don't fit the same pattern of counters and gauges diff --git a/envoy/stats/sink.h b/envoy/stats/sink.h index ff0e607ffaa8c..a0d914416bd9d 100644 --- a/envoy/stats/sink.h +++ b/envoy/stats/sink.h @@ -48,6 +48,29 @@ class MetricSnapshot { virtual SystemTime snapshotTime() const PURE; }; +/** + * A class to define predicates to filter counters, gauges and text readouts for flushing to sinks. + */ +class SinkPredicates { +public: + virtual ~SinkPredicates() = default; + + /** + * @return true if @param counter needs to be flushed to sinks. + */ + virtual bool includeCounter(const Counter& counter) PURE; + + /** + * @return true if @param gague needs to be flushed to sinks. + */ + virtual bool includeGauge(const Gauge& gauge) PURE; + + /** + * @return true if @param text_readout needs to be flushed to sinks. + */ + virtual bool includeTextReadout(const TextReadout& text_readout) PURE; +}; + /** * A sink for stats. Each sink is responsible for writing stats to a backing store. */ diff --git a/envoy/stats/stats.h b/envoy/stats/stats.h index 4a9688b3fd42a..9fba64c34ccfa 100644 --- a/envoy/stats/stats.h +++ b/envoy/stats/stats.h @@ -190,5 +190,15 @@ class TextReadout : public virtual Metric { using TextReadoutSharedPtr = RefcountPtr; +/** + * Callback invoked to provide size of stats container. + */ +using SizeFn = std::function; + +/** + * Callback invoked for each stat during iteration. + */ +template using StatFn = std::function; + } // namespace Stats } // namespace Envoy diff --git a/envoy/stats/store.h b/envoy/stats/store.h index 3d456bbe7bec9..a54a170365787 100644 --- a/envoy/stats/store.h +++ b/envoy/stats/store.h @@ -24,6 +24,7 @@ class Instance; namespace Stats { class Sink; +class SinkPredicates; /** * A store for all known counters, gauges, and timers. @@ -51,20 +52,27 @@ class Store : public Scope { virtual std::vector histograms() const PURE; /** - * Iterate over all stats that need to be added to a sink. Note, that implementations can - * potentially hold on to a mutex that will deadlock if the passed in functors try to create - * or delete a stat. - * @param f_size functor that is provided the number of all stats in the sink. - * @param f_stat functor that is provided one stat in the sink at a time. + * Iterate over all stats. Note, that implementations can potentially hold on to a mutex that + * will deadlock if the passed in functors try to create or delete a stat. + * @param f_size functor that is provided the current number of all stats. Note that this is + * called only once, prior to any calls to f_stat. + * @param f_stat functor that is provided one stat at a time from the stats container. */ - virtual void forEachCounter(std::function f_size, - std::function f_stat) const PURE; - - virtual void forEachGauge(std::function f_size, - std::function f_stat) const PURE; + virtual void forEachCounter(SizeFn f_size, StatFn f_stat) const PURE; + virtual void forEachGauge(SizeFn f_size, StatFn f_stat) const PURE; + virtual void forEachTextReadout(SizeFn f_size, StatFn f_stat) const PURE; - virtual void forEachTextReadout(std::function f_size, - std::function f_stat) const PURE; + /** + * Iterate over all stats that need to be flushed to sinks. Note, that implementations can + * potentially hold on to a mutex that will deadlock if the passed in functors try to create + * or delete a stat. + * @param f_size functor that is provided the number of all stats that will be flushed to sinks. + * Note that this is called only once, prior to any calls to f_stat. + * @param f_stat functor that is provided one stat that will be flushed to sinks, at a time. + */ + virtual void forEachSinkedCounter(SizeFn f_size, StatFn f_stat) const PURE; + virtual void forEachSinkedGauge(SizeFn f_size, StatFn f_stat) const PURE; + virtual void forEachSinkedTextReadout(SizeFn f_size, StatFn f_stat) const PURE; }; using StorePtr = std::unique_ptr; @@ -123,6 +131,14 @@ class StoreRoot : public Store { * method would be asserted. */ virtual void mergeHistograms(PostMergeCb merge_complete_cb) PURE; + + /** + * Set predicates for filtering counters, gauges and text readouts to be flushed to sinks. + * Note that if the sink predicates object is set, we do not send non-sink stats over to the + * child process during hot restart. This will result in the admin stats console being wrong + * during hot restart. + */ + virtual void setSinkPredicates(std::unique_ptr&& sink_predicates) PURE; }; using StoreRootPtr = std::unique_ptr; diff --git a/source/common/stats/allocator_impl.cc b/source/common/stats/allocator_impl.cc index 9e8a37705e4d2..d0103b1065c7f 100644 --- a/source/common/stats/allocator_impl.cc +++ b/source/common/stats/allocator_impl.cc @@ -3,6 +3,7 @@ #include #include +#include "envoy/stats/sink.h" #include "envoy/stats/stats.h" #include "envoy/stats/symbol_table.h" @@ -144,6 +145,7 @@ class CounterImpl : public StatsSharedImpl { void removeFromSetLockHeld() ABSL_EXCLUSIVE_LOCKS_REQUIRED(alloc_.mutex_) override { const size_t count = alloc_.counters_.erase(statName()); ASSERT(count == 1); + alloc_.sinked_counters_.erase(this); } // Stats::Counter @@ -188,6 +190,7 @@ class GaugeImpl : public StatsSharedImpl { void removeFromSetLockHeld() override ABSL_EXCLUSIVE_LOCKS_REQUIRED(alloc_.mutex_) { const size_t count = alloc_.gauges_.erase(statName()); ASSERT(count == 1); + alloc_.sinked_gauges_.erase(this); } // Stats::Gauge @@ -260,6 +263,7 @@ class TextReadoutImpl : public StatsSharedImpl { void removeFromSetLockHeld() ABSL_EXCLUSIVE_LOCKS_REQUIRED(alloc_.mutex_) override { const size_t count = alloc_.text_readouts_.erase(statName()); ASSERT(count == 1); + alloc_.sinked_text_readouts_.erase(this); } // Stats::TextReadout @@ -289,6 +293,11 @@ CounterSharedPtr AllocatorImpl::makeCounter(StatName name, StatName tag_extracte } auto counter = CounterSharedPtr(makeCounterInternal(name, tag_extracted_name, stat_name_tags)); counters_.insert(counter.get()); + // Add counter to sinked_counters_ if it matches the sink predicate. + if (sink_predicates_ != nullptr && sink_predicates_->includeCounter(*counter)) { + auto val = sinked_counters_.insert(counter.get()); + ASSERT(val.second); + } return counter; } @@ -305,6 +314,11 @@ GaugeSharedPtr AllocatorImpl::makeGauge(StatName name, StatName tag_extracted_na auto gauge = GaugeSharedPtr(new GaugeImpl(name, *this, tag_extracted_name, stat_name_tags, import_mode)); gauges_.insert(gauge.get()); + // Add gauge to sinked_gauges_ if it matches the sink predicate. + if (sink_predicates_ != nullptr && sink_predicates_->includeGauge(*gauge)) { + auto val = sinked_gauges_.insert(gauge.get()); + ASSERT(val.second); + } return gauge; } @@ -320,6 +334,11 @@ TextReadoutSharedPtr AllocatorImpl::makeTextReadout(StatName name, StatName tag_ auto text_readout = TextReadoutSharedPtr(new TextReadoutImpl(name, *this, tag_extracted_name, stat_name_tags)); text_readouts_.insert(text_readout.get()); + // Add text_readout to sinked_text_readouts_ if it matches the sink predicate. + if (sink_predicates_ != nullptr && sink_predicates_->includeTextReadout(*text_readout)) { + auto val = sinked_text_readouts_.insert(text_readout.get()); + ASSERT(val.second); + } return text_readout; } @@ -336,8 +355,7 @@ Counter* AllocatorImpl::makeCounterInternal(StatName name, StatName tag_extracte return new CounterImpl(name, *this, tag_extracted_name, stat_name_tags); } -void AllocatorImpl::forEachCounter(std::function f_size, - std::function f_stat) const { +void AllocatorImpl::forEachCounter(SizeFn f_size, StatFn f_stat) const { Thread::LockGuard lock(mutex_); if (f_size != nullptr) { f_size(counters_.size()); @@ -347,8 +365,7 @@ void AllocatorImpl::forEachCounter(std::function f_size, } } -void AllocatorImpl::forEachGauge(std::function f_size, - std::function f_stat) const { +void AllocatorImpl::forEachGauge(SizeFn f_size, StatFn f_stat) const { Thread::LockGuard lock(mutex_); if (f_size != nullptr) { f_size(gauges_.size()); @@ -358,8 +375,7 @@ void AllocatorImpl::forEachGauge(std::function f_size, } } -void AllocatorImpl::forEachTextReadout(std::function f_size, - std::function f_stat) const { +void AllocatorImpl::forEachTextReadout(SizeFn f_size, StatFn f_stat) const { Thread::LockGuard lock(mutex_); if (f_size != nullptr) { f_size(text_readouts_.size()); @@ -369,6 +385,69 @@ void AllocatorImpl::forEachTextReadout(std::function f_size, } } +void AllocatorImpl::forEachSinkedCounter(SizeFn f_size, StatFn f_stat) const { + if (sink_predicates_ != nullptr) { + Thread::LockGuard lock(mutex_); + f_size(sinked_counters_.size()); + for (auto counter : sinked_counters_) { + f_stat(*counter); + } + } else { + forEachCounter(f_size, f_stat); + } +} + +void AllocatorImpl::forEachSinkedGauge(SizeFn f_size, StatFn f_stat) const { + if (sink_predicates_ != nullptr) { + Thread::LockGuard lock(mutex_); + f_size(sinked_gauges_.size()); + for (auto gauge : sinked_gauges_) { + f_stat(*gauge); + } + } else { + forEachGauge(f_size, f_stat); + } +} + +void AllocatorImpl::forEachSinkedTextReadout(SizeFn f_size, StatFn f_stat) const { + if (sink_predicates_ != nullptr) { + Thread::LockGuard lock(mutex_); + f_size(sinked_text_readouts_.size()); + for (auto text_readout : sinked_text_readouts_) { + f_stat(*text_readout); + } + } else { + forEachTextReadout(f_size, f_stat); + } +} + +void AllocatorImpl::setSinkPredicates(std::unique_ptr&& sink_predicates) { + Thread::LockGuard lock(mutex_); + ASSERT(sink_predicates_ == nullptr); + sink_predicates_ = std::move(sink_predicates); + sinked_counters_.clear(); + sinked_gauges_.clear(); + sinked_text_readouts_.clear(); + // Add counters to the set of sinked counters. + for (auto& counter : counters_) { + if (sink_predicates_->includeCounter(*counter)) { + sinked_counters_.emplace(counter); + } + } + // Add gauges to the set of sinked gauges. + for (auto& gauge : gauges_) { + if (sink_predicates_->includeGauge(*gauge)) { + sinked_gauges_.insert(gauge); + } + } + // Add text_readouts to the set of sinked text readouts. + for (auto& text_readout : text_readouts_) { + if (sink_predicates_->includeTextReadout(*text_readout)) { + sinked_text_readouts_.insert(text_readout); + } + } +} + void AllocatorImpl::markCounterForDeletion(const CounterSharedPtr& counter) { Thread::LockGuard lock(mutex_); auto iter = counters_.find(counter->statName()); @@ -380,6 +459,7 @@ void AllocatorImpl::markCounterForDeletion(const CounterSharedPtr& counter) { // Duplicates are ASSERTed in ~AllocatorImpl. deleted_counters_.emplace_back(*iter); counters_.erase(iter); + sinked_counters_.erase(counter.get()); } void AllocatorImpl::markGaugeForDeletion(const GaugeSharedPtr& gauge) { @@ -393,6 +473,7 @@ void AllocatorImpl::markGaugeForDeletion(const GaugeSharedPtr& gauge) { // Duplicates are ASSERTed in ~AllocatorImpl. deleted_gauges_.emplace_back(*iter); gauges_.erase(iter); + sinked_gauges_.erase(gauge.get()); } void AllocatorImpl::markTextReadoutForDeletion(const TextReadoutSharedPtr& text_readout) { @@ -406,6 +487,7 @@ void AllocatorImpl::markTextReadoutForDeletion(const TextReadoutSharedPtr& text_ // Duplicates are ASSERTed in ~AllocatorImpl. deleted_text_readouts_.emplace_back(*iter); text_readouts_.erase(iter); + sinked_text_readouts_.erase(text_readout.get()); } } // namespace Stats diff --git a/source/common/stats/allocator_impl.h b/source/common/stats/allocator_impl.h index 806e7dc8612fd..86a1e1aa664cf 100644 --- a/source/common/stats/allocator_impl.h +++ b/source/common/stats/allocator_impl.h @@ -2,7 +2,9 @@ #include +#include "envoy/common/optref.h" #include "envoy/stats/allocator.h" +#include "envoy/stats/sink.h" #include "envoy/stats/stats.h" #include "envoy/stats/symbol_table.h" @@ -33,15 +35,17 @@ class AllocatorImpl : public Allocator { SymbolTable& symbolTable() override { return symbol_table_; } const SymbolTable& constSymbolTable() const override { return symbol_table_; } - void forEachCounter(std::function, - std::function) const override; + void forEachCounter(SizeFn, StatFn) const override; - void forEachGauge(std::function, - std::function) const override; + void forEachGauge(SizeFn, StatFn) const override; - void forEachTextReadout(std::function, - std::function) const override; + void forEachTextReadout(SizeFn, StatFn) const override; + void forEachSinkedCounter(SizeFn f_size, StatFn f_stat) const override; + void forEachSinkedGauge(SizeFn f_size, StatFn f_stat) const override; + void forEachSinkedTextReadout(SizeFn f_size, StatFn f_stat) const override; + + void setSinkPredicates(std::unique_ptr&& sink_predicates) override; #ifndef ENVOY_CONFIG_COVERAGE void debugPrint(); #endif @@ -93,6 +97,14 @@ class AllocatorImpl : public Allocator { std::vector deleted_gauges_ ABSL_GUARDED_BY(mutex_); std::vector deleted_text_readouts_ ABSL_GUARDED_BY(mutex_); + template using StatPointerSet = absl::flat_hash_set; + // Stat pointers that participate in the flush to sink process. + StatPointerSet sinked_counters_ ABSL_GUARDED_BY(mutex_); + StatPointerSet sinked_gauges_ ABSL_GUARDED_BY(mutex_); + StatPointerSet sinked_text_readouts_ ABSL_GUARDED_BY(mutex_); + + // Predicates used to filter stats to be flushed. + std::unique_ptr sink_predicates_; SymbolTable& symbol_table_; Thread::ThreadSynchronizer sync_; diff --git a/source/common/stats/isolated_store_impl.h b/source/common/stats/isolated_store_impl.h index ebff944da7eff..cc44d8e811cac 100644 --- a/source/common/stats/isolated_store_impl.h +++ b/source/common/stats/isolated_store_impl.h @@ -101,9 +101,10 @@ template class IsolatedStatsCache { return true; } - void forEachStat(std::function f_size, - std::function f_stat) const { - f_size(stats_.size()); + void forEachStat(SizeFn f_size, std::function f_stat) const { + if (f_size != nullptr) { + f_size(stats_.size()); + } for (auto const& stat : stats_) { f_stat(*stat.second); } @@ -214,21 +215,30 @@ class IsolatedStoreImpl : public StoreImpl { return textReadoutFromStatName(storage.statName()); } - void forEachCounter(std::function f_size, - std::function f_stat) const override { + void forEachCounter(SizeFn f_size, StatFn f_stat) const override { counters_.forEachStat(f_size, f_stat); } - void forEachGauge(std::function f_size, - std::function f_stat) const override { + void forEachGauge(SizeFn f_size, StatFn f_stat) const override { gauges_.forEachStat(f_size, f_stat); } - void forEachTextReadout(std::function f_size, - std::function f_stat) const override { + void forEachTextReadout(SizeFn f_size, StatFn f_stat) const override { text_readouts_.forEachStat(f_size, f_stat); } + void forEachSinkedCounter(SizeFn f_size, StatFn f_stat) const override { + forEachCounter(f_size, f_stat); + } + + void forEachSinkedGauge(SizeFn f_size, StatFn f_stat) const override { + forEachGauge(f_size, f_stat); + } + + void forEachSinkedTextReadout(SizeFn f_size, StatFn f_stat) const override { + forEachTextReadout(f_size, f_stat); + } + private: IsolatedStoreImpl(std::unique_ptr&& symbol_table); diff --git a/source/common/stats/thread_local_store.cc b/source/common/stats/thread_local_store.cc index 97960182096c0..f0c32033ceccf 100644 --- a/source/common/stats/thread_local_store.cc +++ b/source/common/stats/thread_local_store.cc @@ -954,24 +954,38 @@ bool ParentHistogramImpl::usedLockHeld() const { return false; } -void ThreadLocalStoreImpl::forEachCounter(std::function f_size, - std::function f_stat) const { - Thread::LockGuard lock(lock_); +void ThreadLocalStoreImpl::forEachCounter(SizeFn f_size, StatFn f_stat) const { alloc_.forEachCounter(f_size, f_stat); } -void ThreadLocalStoreImpl::forEachGauge(std::function f_size, - std::function f_stat) const { - Thread::LockGuard lock(lock_); +void ThreadLocalStoreImpl::forEachGauge(SizeFn f_size, StatFn f_stat) const { alloc_.forEachGauge(f_size, f_stat); } -void ThreadLocalStoreImpl::forEachTextReadout( - std::function f_size, - std::function f_stat) const { - Thread::LockGuard lock(lock_); +void ThreadLocalStoreImpl::forEachTextReadout(SizeFn f_size, StatFn f_stat) const { alloc_.forEachTextReadout(f_size, f_stat); } +void ThreadLocalStoreImpl::forEachSinkedCounter(SizeFn f_size, StatFn f_stat) const { + alloc_.forEachSinkedCounter(f_size, f_stat); +} + +void ThreadLocalStoreImpl::forEachSinkedGauge(SizeFn f_size, StatFn f_stat) const { + alloc_.forEachSinkedGauge(f_size, f_stat); +} + +void ThreadLocalStoreImpl::forEachSinkedTextReadout(SizeFn f_size, + StatFn f_stat) const { + alloc_.forEachSinkedTextReadout(f_size, f_stat); +} + +void ThreadLocalStoreImpl::setSinkPredicates(std::unique_ptr&& sink_predicates) { + ASSERT(sink_predicates != nullptr); + if (sink_predicates != nullptr) { + sink_predicates_.emplace(*sink_predicates); + alloc_.setSinkPredicates(std::move(sink_predicates)); + } +} + } // namespace Stats } // namespace Envoy diff --git a/source/common/stats/thread_local_store.h b/source/common/stats/thread_local_store.h index 742e1fc3c04d6..a22a04d00f985 100644 --- a/source/common/stats/thread_local_store.h +++ b/source/common/stats/thread_local_store.h @@ -244,14 +244,9 @@ class ThreadLocalStoreImpl : Logger::Loggable, public StoreRo std::vector textReadouts() const override; std::vector histograms() const override; - void forEachCounter(std::function f_size, - std::function f_stat) const override; - - void forEachGauge(std::function f_size, - std::function f_stat) const override; - - void forEachTextReadout(std::function f_size, - std::function f_stat) const override; + void forEachCounter(SizeFn f_size, StatFn f_stat) const override; + void forEachGauge(SizeFn f_size, StatFn f_stat) const override; + void forEachTextReadout(SizeFn f_size, StatFn f_stat) const override; // Stats::StoreRoot void addSink(Sink& sink) override { timer_sinks_.push_back(sink); } @@ -267,6 +262,12 @@ class ThreadLocalStoreImpl : Logger::Loggable, public StoreRo Histogram& tlsHistogram(ParentHistogramImpl& parent, uint64_t id); + void forEachSinkedCounter(SizeFn f_size, StatFn f_stat) const override; + void forEachSinkedGauge(SizeFn f_size, StatFn f_stat) const override; + void forEachSinkedTextReadout(SizeFn f_size, StatFn f_stat) const override; + + void setSinkPredicates(std::unique_ptr&& sink_predicates) override; + /** * @return a thread synchronizer object used for controlling thread behavior in tests. */ @@ -500,6 +501,7 @@ class ThreadLocalStoreImpl : Logger::Loggable, public StoreRo StatNameHashSet* tls_rejected_stats); TlsCache& tlsCache() { return **tls_cache_; } + OptRef sink_predicates_; Allocator& alloc_; Event::Dispatcher* main_thread_dispatcher_{}; using TlsCacheSlot = ThreadLocal::TypedSlotPtr; diff --git a/source/server/config_validation/server.h b/source/server/config_validation/server.h index a48faf8554238..6df0b4c67e725 100644 --- a/source/server/config_validation/server.h +++ b/source/server/config_validation/server.h @@ -125,6 +125,7 @@ class ValidationInstance final : Logger::Loggable, void setDefaultTracingConfig(const envoy::config::trace::v3::Tracing& tracing_config) override { http_context_.setDefaultTracingConfig(tracing_config); } + void setSinkPredicates(std::unique_ptr&&) override {} // Server::ListenerComponentFactory LdsApiPtr createLdsApi(const envoy::config::core::v3::ConfigSource& lds_config, diff --git a/source/server/hot_restarting_parent.cc b/source/server/hot_restarting_parent.cc index 14c012b70cbca..fab51c4fc7b53 100644 --- a/source/server/hot_restarting_parent.cc +++ b/source/server/hot_restarting_parent.cc @@ -128,26 +128,26 @@ HotRestartingParent::Internal::getListenSocketsForChild(const HotRestartMessage: // magnitude of memory usage that they are meant to avoid, since this map holds full-string // names. The problem can be solved by splitting the export up over many chunks. void HotRestartingParent::Internal::exportStatsToChild(HotRestartMessage::Reply::Stats* stats) { - for (const auto& gauge : server_->stats().gauges()) { - if (gauge->used()) { - const std::string name = gauge->name(); - (*stats->mutable_gauges())[name] = gauge->value(); - recordDynamics(stats, name, gauge->statName()); + server_->stats().forEachSinkedGauge(nullptr, [this, stats](Stats::Gauge& gauge) mutable { + if (gauge.used()) { + const std::string name = gauge.name(); + (*stats->mutable_gauges())[name] = gauge.value(); + recordDynamics(stats, name, gauge.statName()); } - } + }); - for (const auto& counter : server_->stats().counters()) { - if (counter->used()) { + server_->stats().forEachSinkedCounter(nullptr, [this, stats](Stats::Counter& counter) mutable { + if (counter.used()) { // The hot restart parent is expected to have stopped its normal stat exporting (and so // latching) by the time it begins exporting to the hot restart child. - uint64_t latched_value = counter->latch(); + uint64_t latched_value = counter.latch(); if (latched_value > 0) { - const std::string name = counter->name(); + const std::string name = counter.name(); (*stats->mutable_counter_deltas())[name] = latched_value; - recordDynamics(stats, name, counter->statName()); + recordDynamics(stats, name, counter.statName()); } } - } + }); stats->set_memory_allocated(Memory::Stats::totalCurrentlyAllocated()); stats->set_num_connections(server_->listenerManager().numConnections()); } diff --git a/source/server/server.cc b/source/server/server.cc index c914699197e6b..98973229f182a 100644 --- a/source/server/server.cc +++ b/source/server/server.cc @@ -20,6 +20,7 @@ #include "envoy/server/bootstrap_extension_config.h" #include "envoy/server/instance.h" #include "envoy/server/options.h" +#include "envoy/stats/histogram.h" #include "envoy/stats/stats.h" #include "envoy/upstream/cluster_manager.h" @@ -168,7 +169,7 @@ void InstanceImpl::failHealthcheck(bool fail) { } MetricSnapshotImpl::MetricSnapshotImpl(Stats::Store& store, TimeSource& time_source) { - store.forEachCounter( + store.forEachSinkedCounter( [this](std::size_t size) mutable { snapped_counters_.reserve(size); counters_.reserve(size); @@ -178,7 +179,7 @@ MetricSnapshotImpl::MetricSnapshotImpl(Stats::Store& store, TimeSource& time_sou counters_.push_back({counter.latch(), counter}); }); - store.forEachGauge( + store.forEachSinkedGauge( [this](std::size_t size) mutable { snapped_gauges_.reserve(size); gauges_.reserve(size); @@ -195,7 +196,7 @@ MetricSnapshotImpl::MetricSnapshotImpl(Stats::Store& store, TimeSource& time_sou histograms_.push_back(*histogram); } - store.forEachTextReadout( + store.forEachSinkedTextReadout( [this](std::size_t size) mutable { snapped_text_readouts_.reserve(size); text_readouts_.reserve(size); diff --git a/source/server/server.h b/source/server/server.h index 19ac1b5ce169a..5fa20d0a0cd01 100644 --- a/source/server/server.h +++ b/source/server/server.h @@ -293,6 +293,10 @@ class InstanceImpl final : Logger::Loggable, Quic::QuicStatNames& quicStatNames() { return quic_stat_names_; } + void setSinkPredicates(std::unique_ptr&& sink_predicates) override { + stats_store_.setSinkPredicates(std::move(sink_predicates)); + } + // ServerLifecycleNotifier ServerLifecycleNotifier::HandlePtr registerCallback(Stage stage, StageCallback callback) override; ServerLifecycleNotifier::HandlePtr diff --git a/test/common/stats/allocator_impl_test.cc b/test/common/stats/allocator_impl_test.cc index cc06acedaef1e..a4c0eb2d271b4 100644 --- a/test/common/stats/allocator_impl_test.cc +++ b/test/common/stats/allocator_impl_test.cc @@ -1,6 +1,9 @@ #include +#include #include +#include "envoy/stats/sink.h" + #include "source/common/stats/allocator_impl.h" #include "test/test_common/logging.h" @@ -41,6 +44,26 @@ class AllocatorImplTest : public testing::Test { bool are_stats_marked_for_deletion_ = false; }; +class TestSinkPredicates : public SinkPredicates { +public: + ~TestSinkPredicates() override = default; + StatNameHashSet& sinkedStatNames() { return sinked_stat_names_; } + + // SinkPredicates + bool includeCounter(const Counter& counter) override { + return sinked_stat_names_.find(counter.statName()) != sinked_stat_names_.end(); + } + bool includeGauge(const Gauge& gauge) override { + return sinked_stat_names_.find(gauge.statName()) != sinked_stat_names_.end(); + } + bool includeTextReadout(const TextReadout& text_readout) override { + return sinked_stat_names_.find(text_readout.statName()) != sinked_stat_names_.end(); + } + +private: + StatNameHashSet sinked_stat_names_; +}; + // Allocate 2 counters of the same name, and you'll get the same object. TEST_F(AllocatorImplTest, CountersWithSameName) { StatName counter_name = makeStat("counter.name"); @@ -309,7 +332,7 @@ TEST_F(AllocatorImplTest, ForEachWithNullSizeLambda) { } size_t num_iterations = 0; alloc_.forEachCounter(nullptr, [&num_iterations](Stats::Counter& counter) { - (void)counter; + UNREFERENCED_PARAMETER(counter); ++num_iterations; }); EXPECT_EQ(num_iterations, num_stats); @@ -321,7 +344,7 @@ TEST_F(AllocatorImplTest, ForEachWithNullSizeLambda) { } num_iterations = 0; alloc_.forEachGauge(nullptr, [&num_iterations](Stats::Gauge& gauge) { - (void)gauge; + UNREFERENCED_PARAMETER(gauge); ++num_iterations; }); EXPECT_EQ(num_iterations, num_stats); @@ -333,7 +356,7 @@ TEST_F(AllocatorImplTest, ForEachWithNullSizeLambda) { } num_iterations = 0; alloc_.forEachTextReadout(nullptr, [&num_iterations](Stats::TextReadout& text_readout) { - (void)text_readout; + UNREFERENCED_PARAMETER(text_readout); ++num_iterations; }); EXPECT_EQ(num_iterations, num_stats); @@ -410,6 +433,143 @@ TEST_F(AllocatorImplTest, AskForDeletedStat) { EXPECT_EQ(rejected_text_readout.value(), "deleted value"); } +TEST_F(AllocatorImplTest, ForEachSinkedCounter) { + std::unique_ptr moved_sink_predicates = + std::make_unique(); + TestSinkPredicates* sink_predicates = moved_sink_predicates.get(); + std::vector sinked_counters; + std::vector unsinked_counters; + + alloc_.setSinkPredicates(std::move(moved_sink_predicates)); + + const size_t num_stats = 11; + + for (size_t idx = 0; idx < num_stats; ++idx) { + auto stat_name = makeStat(absl::StrCat("counter.", idx)); + // sink every 3rd stat + if ((idx + 1) % 3 == 0) { + sink_predicates->sinkedStatNames().insert(stat_name); + sinked_counters.emplace_back(alloc_.makeCounter(stat_name, StatName(), {})); + } else { + unsinked_counters.emplace_back(alloc_.makeCounter(stat_name, StatName(), {})); + } + } + + EXPECT_EQ(sinked_counters.size(), 3); + EXPECT_EQ(unsinked_counters.size(), 8); + + size_t num_sinked_counters = 0; + size_t num_iterations = 0; + alloc_.forEachSinkedCounter( + [&num_sinked_counters](std::size_t size) { num_sinked_counters = size; }, + [&num_iterations, sink_predicates](Stats::Counter& counter) { + EXPECT_EQ(sink_predicates->sinkedStatNames().count(counter.statName()), 1); + ++num_iterations; + }); + EXPECT_EQ(num_sinked_counters, 3); + EXPECT_EQ(num_iterations, 3); + + // Erase all sinked stats. + sinked_counters.clear(); + num_iterations = 0; + alloc_.forEachSinkedCounter( + [&num_sinked_counters](std::size_t size) { num_sinked_counters = size; }, + [&num_iterations](Stats::Counter&) { ++num_iterations; }); + EXPECT_EQ(num_sinked_counters, 0); + EXPECT_EQ(num_iterations, 0); +} + +TEST_F(AllocatorImplTest, ForEachSinkedGauge) { + std::unique_ptr moved_sink_predicates = + std::make_unique(); + TestSinkPredicates* sink_predicates = moved_sink_predicates.get(); + std::vector sinked_gauges; + std::vector unsinked_gauges; + + alloc_.setSinkPredicates(std::move(moved_sink_predicates)); + const size_t num_stats = 11; + + for (size_t idx = 0; idx < num_stats; ++idx) { + auto stat_name = makeStat(absl::StrCat("gauge.", idx)); + // sink every 5th stat + if ((idx + 1) % 5 == 0) { + sink_predicates->sinkedStatNames().insert(stat_name); + sinked_gauges.emplace_back( + alloc_.makeGauge(stat_name, StatName(), {}, Gauge::ImportMode::Accumulate)); + } else { + unsinked_gauges.emplace_back( + alloc_.makeGauge(stat_name, StatName(), {}, Gauge::ImportMode::Accumulate)); + } + } + + EXPECT_EQ(sinked_gauges.size(), 2); + EXPECT_EQ(unsinked_gauges.size(), 9); + + size_t num_sinked_gauges = 0; + size_t num_iterations = 0; + alloc_.forEachSinkedGauge([&num_sinked_gauges](std::size_t size) { num_sinked_gauges = size; }, + [&num_iterations, sink_predicates](Stats::Gauge& gauge) { + EXPECT_EQ(sink_predicates->sinkedStatNames().count(gauge.statName()), + 1); + ++num_iterations; + }); + EXPECT_EQ(num_sinked_gauges, 2); + EXPECT_EQ(num_iterations, 2); + + // Erase all sinked stats. + sinked_gauges.clear(); + num_iterations = 0; + alloc_.forEachSinkedGauge([&num_sinked_gauges](std::size_t size) { num_sinked_gauges = size; }, + [&num_iterations](Stats::Gauge&) { ++num_iterations; }); + EXPECT_EQ(num_sinked_gauges, 0); + EXPECT_EQ(num_iterations, 0); +} + +TEST_F(AllocatorImplTest, ForEachSinkedTextReadout) { + std::unique_ptr moved_sink_predicates = + std::make_unique(); + TestSinkPredicates* sink_predicates = moved_sink_predicates.get(); + std::vector sinked_text_readouts; + std::vector unsinked_text_readouts; + + alloc_.setSinkPredicates(std::move(moved_sink_predicates)); + const size_t num_stats = 11; + + for (size_t idx = 0; idx < num_stats; ++idx) { + auto stat_name = makeStat(absl::StrCat("text_readout.", idx)); + // sink every 2nd stat + if ((idx + 1) % 2 == 0) { + sink_predicates->sinkedStatNames().insert(stat_name); + sinked_text_readouts.emplace_back(alloc_.makeTextReadout(stat_name, StatName(), {})); + } else { + unsinked_text_readouts.emplace_back(alloc_.makeTextReadout(stat_name, StatName(), {})); + } + } + + EXPECT_EQ(sinked_text_readouts.size(), 5); + EXPECT_EQ(unsinked_text_readouts.size(), 6); + + size_t num_sinked_text_readouts = 0; + size_t num_iterations = 0; + alloc_.forEachSinkedTextReadout( + [&num_sinked_text_readouts](std::size_t size) { num_sinked_text_readouts = size; }, + [&num_iterations, sink_predicates](Stats::TextReadout& text_readout) { + EXPECT_EQ(sink_predicates->sinkedStatNames().count(text_readout.statName()), 1); + ++num_iterations; + }); + EXPECT_EQ(num_sinked_text_readouts, 5); + EXPECT_EQ(num_iterations, 5); + + // Erase all sinked stats. + sinked_text_readouts.clear(); + num_iterations = 0; + alloc_.forEachSinkedTextReadout( + [&num_sinked_text_readouts](std::size_t size) { num_sinked_text_readouts = size; }, + [&num_iterations](Stats::TextReadout&) { ++num_iterations; }); + EXPECT_EQ(num_sinked_text_readouts, 0); + EXPECT_EQ(num_iterations, 0); +} + } // namespace } // namespace Stats } // namespace Envoy diff --git a/test/common/stats/thread_local_store_test.cc b/test/common/stats/thread_local_store_test.cc index 0847d9605df29..a5a59f960c781 100644 --- a/test/common/stats/thread_local_store_test.cc +++ b/test/common/stats/thread_local_store_test.cc @@ -1,9 +1,11 @@ #include +#include #include #include #include "envoy/config/metrics/v3/stats.pb.h" #include "envoy/stats/histogram.h" +#include "envoy/stats/sink.h" #include "source/common/common/c_smart_ptr.h" #include "source/common/event/dispatcher_impl.h" @@ -1564,6 +1566,7 @@ TEST_F(HistogramTest, ParentHistogramBucketSummary) { "B3.6e+06(1,1)", parent_histogram->bucketSummary()); } + class ThreadLocalRealThreadsTestBase : public Thread::RealThreadsTestHelper, public ThreadLocalStoreNoMocksTestBase { protected: @@ -1801,7 +1804,7 @@ TEST_F(HistogramThreadTest, ScopeOverlap) { 2 * NumThreads, ") "))); // Now clear everything, and synchronize the system by calling mergeHistograms(). - // THere should be no more ParentHistograms or TlsHistograms. + // There should be no more ParentHistograms or TlsHistograms. scope2.reset(); histograms.clear(); mergeHistograms(); diff --git a/test/integration/server.h b/test/integration/server.h index 880b899d2f3a9..48e4c5eb08ae4 100644 --- a/test/integration/server.h +++ b/test/integration/server.h @@ -9,6 +9,7 @@ #include "envoy/config/listener/v3/listener.pb.h" #include "envoy/server/options.h" #include "envoy/server/process_context.h" +#include "envoy/stats/histogram.h" #include "envoy/stats/stats.h" #include "source/common/common/assert.h" @@ -281,21 +282,34 @@ class TestIsolatedStoreImpl : public StoreRoot { Thread::LockGuard lock(lock_); return store_.counterFromStatNameWithTags(name, tags); } - void forEachCounter(std::function f_size, - std::function f_stat) const override { + void forEachCounter(Stats::SizeFn f_size, StatFn f_stat) const override { Thread::LockGuard lock(lock_); store_.forEachCounter(f_size, f_stat); } - void forEachGauge(std::function f_size, - std::function f_stat) const override { + void forEachGauge(Stats::SizeFn f_size, StatFn f_stat) const override { Thread::LockGuard lock(lock_); store_.forEachGauge(f_size, f_stat); } - void forEachTextReadout(std::function f_size, - std::function f_stat) const override { + void forEachTextReadout(Stats::SizeFn f_size, StatFn f_stat) const override { Thread::LockGuard lock(lock_); store_.forEachTextReadout(f_size, f_stat); } + void forEachSinkedCounter(Stats::SizeFn f_size, StatFn f_stat) const override { + Thread::LockGuard lock(lock_); + store_.forEachSinkedCounter(f_size, f_stat); + } + void forEachSinkedGauge(Stats::SizeFn f_size, StatFn f_stat) const override { + Thread::LockGuard lock(lock_); + store_.forEachSinkedGauge(f_size, f_stat); + } + void forEachSinkedTextReadout(Stats::SizeFn f_size, StatFn f_stat) const override { + Thread::LockGuard lock(lock_); + store_.forEachSinkedTextReadout(f_size, f_stat); + } + void setSinkPredicates(std::unique_ptr&& sink_predicates) override { + UNREFERENCED_PARAMETER(sink_predicates); + } + Counter& counterFromString(const std::string& name) override { Thread::LockGuard lock(lock_); return store_.counterFromString(name); diff --git a/test/mocks/server/instance.h b/test/mocks/server/instance.h index de4f51099823c..c208614218726 100644 --- a/test/mocks/server/instance.h +++ b/test/mocks/server/instance.h @@ -87,6 +87,7 @@ class MockInstance : public Instance { MOCK_METHOD(Configuration::ServerFactoryContext&, serverFactoryContext, ()); MOCK_METHOD(Configuration::TransportSocketFactoryContext&, transportSocketFactoryContext, ()); MOCK_METHOD(bool, enableReusePortDefault, ()); + MOCK_METHOD(void, setSinkPredicates, (std::unique_ptr &&)); void setDefaultTracingConfig(const envoy::config::trace::v3::Tracing& tracing_config) override { http_context_.setDefaultTracingConfig(tracing_config); @@ -141,6 +142,7 @@ class MockStatsConfig : public virtual StatsConfig { MOCK_METHOD(const std::list&, sinks, (), (const)); MOCK_METHOD(std::chrono::milliseconds, flushInterval, (), (const)); MOCK_METHOD(bool, flushOnAdmin, (), (const)); + MOCK_METHOD(const Stats::SinkPredicates*, sinkPredicates, (), (const)); }; class MockServerFactoryContext : public virtual ServerFactoryContext { diff --git a/test/mocks/stats/mocks.cc b/test/mocks/stats/mocks.cc index 7eba3fc31cda2..6ed277eed2fd1 100644 --- a/test/mocks/stats/mocks.cc +++ b/test/mocks/stats/mocks.cc @@ -70,6 +70,9 @@ MockMetricSnapshot::~MockMetricSnapshot() = default; MockSink::MockSink() = default; MockSink::~MockSink() = default; +MockSinkPredicates::MockSinkPredicates() = default; +MockSinkPredicates::~MockSinkPredicates() = default; + MockStore::MockStore() { ON_CALL(*this, counter(_)).WillByDefault(ReturnRef(counter_)); ON_CALL(*this, gauge(_, _)).WillByDefault(ReturnRef(gauge_)); diff --git a/test/mocks/stats/mocks.h b/test/mocks/stats/mocks.h index 3043a2c35fbb4..fd3d245395510 100644 --- a/test/mocks/stats/mocks.h +++ b/test/mocks/stats/mocks.h @@ -264,6 +264,15 @@ class MockSink : public Sink { MOCK_METHOD(void, onHistogramComplete, (const Histogram& histogram, uint64_t value)); }; +class MockSinkPredicates : public SinkPredicates { +public: + MockSinkPredicates(); + ~MockSinkPredicates() override; + MOCK_METHOD(bool, includeCounter, (const Counter&)); + MOCK_METHOD(bool, includeGauge, (const Gauge&)); + MOCK_METHOD(bool, includeTextReadout, (const TextReadout&)); +}; + class MockStore : public TestUtil::TestStore { public: MockStore(); @@ -286,13 +295,9 @@ class MockStore : public TestUtil::TestStore { MOCK_METHOD(Histogram&, histogramFromString, (const std::string& name, Histogram::Unit unit)); MOCK_METHOD(TextReadout&, textReadout, (const std::string&)); MOCK_METHOD(std::vector, text_readouts, (), (const)); - MOCK_METHOD(void, forEachCounter, - (std::function, std::function), (const)); - MOCK_METHOD(void, forEachGauge, - (std::function, std::function), (const)); - MOCK_METHOD(void, forEachTextReadout, - (std::function, std::function), - (const)); + MOCK_METHOD(void, forEachCounter, (SizeFn, StatFn), (const)); + MOCK_METHOD(void, forEachGauge, (SizeFn, StatFn), (const)); + MOCK_METHOD(void, forEachTextReadout, (SizeFn, StatFn), (const)); MOCK_METHOD(CounterOptConstRef, findCounter, (StatName), (const)); MOCK_METHOD(GaugeOptConstRef, findGauge, (StatName), (const)); diff --git a/test/server/config_validation/server_test.cc b/test/server/config_validation/server_test.cc index 14611beda5a60..d65e7354fa651 100644 --- a/test/server/config_validation/server_test.cc +++ b/test/server/config_validation/server_test.cc @@ -1,3 +1,4 @@ +#include #include #include "envoy/server/filter_config.h" @@ -129,6 +130,7 @@ TEST_P(ValidationServerTest, NoopLifecycleNotifier) { server.registerCallback(ServerLifecycleNotifier::Stage::ShutdownExit, [] { FAIL(); }); server.registerCallback(ServerLifecycleNotifier::Stage::ShutdownExit, [](Event::PostCb) { FAIL(); }); + server.setSinkPredicates(std::make_unique>()); server.shutdown(); } diff --git a/test/server/server_stats_flush_benchmark_test.cc b/test/server/server_stats_flush_benchmark_test.cc index 98790c5c65d56..895967523c624 100644 --- a/test/server/server_stats_flush_benchmark_test.cc +++ b/test/server/server_stats_flush_benchmark_test.cc @@ -19,10 +19,28 @@ namespace Envoy { +class TestSinkPredicates : public Stats::SinkPredicates { +public: + bool includeCounter(const Stats::Counter&) override { return (++num_counters_) % 10 == 0; } + bool includeGauge(const Stats::Gauge&) override { return (++num_gauges_) % 10 == 0; } + bool includeTextReadout(const Stats::TextReadout&) override { + return (++num_text_readouts_) % 10 == 0; + } + +private: + size_t num_counters_ = 0; + size_t num_gauges_ = 0; + size_t num_text_readouts_ = 0; +}; + class StatsSinkFlushSpeedTest { public: - StatsSinkFlushSpeedTest(size_t const num_stats) + StatsSinkFlushSpeedTest(size_t const num_stats, bool set_sink_predicates = false) : pool_(symbol_table_), stats_allocator_(symbol_table_), stats_store_(stats_allocator_) { + if (set_sink_predicates) { + stats_store_.setSinkPredicates( + std::unique_ptr{std::make_unique()}); + } // Create counters for (uint64_t idx = 0; idx < num_stats; ++idx) { @@ -40,6 +58,12 @@ class StatsSinkFlushSpeedTest { auto stat_name = pool_.add(absl::StrCat("text_readout.", idx)); stats_store_.textReadoutFromStatName(stat_name).set(absl::StrCat("text_readout.", idx)); } + + // Create histograms + for (uint64_t idx = 0; idx < num_stats; ++idx) { + std::string stat_name(absl::StrCat("histogram.", idx)); + stats_store_.histogramFromString(stat_name, Stats::Histogram::Unit::Unspecified); + } } void test(::benchmark::State& state) { @@ -69,6 +93,22 @@ static void bmFlushToSinks(::benchmark::State& state) { StatsSinkFlushSpeedTest speed_test(state.range(0)); speed_test.test(state); } + +static void bmFlushToSinksWithPredicatesSet(::benchmark::State& state) { + // Skip expensive benchmarks for unit tests. + if (benchmark::skipExpensiveBenchmarks() && state.range(0) > 100) { + state.SkipWithError("Skipping expensive benchmark"); + return; + } + + StatsSinkFlushSpeedTest speed_test(state.range(0), true); + speed_test.test(state); +} + BENCHMARK(bmFlushToSinks)->Unit(::benchmark::kMillisecond)->RangeMultiplier(10)->Range(10, 1000000); +BENCHMARK(bmFlushToSinksWithPredicatesSet) + ->Unit(::benchmark::kMillisecond) + ->RangeMultiplier(10) + ->Range(10, 1000000); } // namespace Envoy diff --git a/tools/spelling/spelling_dictionary.txt b/tools/spelling/spelling_dictionary.txt index 80ca75656c1e0..0e4e9aaf1e3e1 100644 --- a/tools/spelling/spelling_dictionary.txt +++ b/tools/spelling/spelling_dictionary.txt @@ -1109,6 +1109,7 @@ siginfo signalstack siloed sim +sinked sizeof smatch snapshotted