Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
deb76f8
Add ability to filter stats to be flushed to sinks.
pradeepcrao Oct 27, 2021
6215133
Merge remote-tracking branch 'upstream/main' into stats_filter2
pradeepcrao Oct 27, 2021
5c593e3
Merge remote-tracking branch 'upstream/main' into stats_filter2
pradeepcrao Oct 27, 2021
e0f6a99
Add tests for histograms.
pradeepcrao Nov 9, 2021
58d423a
Merge remote-tracking branch 'upstream/main' into stats_filter2
pradeepcrao Nov 9, 2021
d82fb8f
Fix format.
pradeepcrao Nov 9, 2021
b8c7472
Merge remote-tracking branch 'upstream/main' into stats_filter2
pradeepcrao Nov 9, 2021
e0ddece
Address feedback. Revert histograms change (for future PR).
pradeepcrao Nov 9, 2021
9b29368
Merge remote-tracking branch 'upstream/main' into stats_filter2
pradeepcrao Nov 9, 2021
c669b51
Address feedback.
pradeepcrao Nov 10, 2021
97d10c4
Merge remote-tracking branch 'upstream/main' into stats_filter2
pradeepcrao Nov 10, 2021
dee92bf
Remove unneeded forward declaration. Fix comments.
pradeepcrao Nov 10, 2021
bfb1f42
Fix coverage.
pradeepcrao Nov 10, 2021
cd77cb3
Merge remote-tracking branch 'upstream/main' into stats_filter2
pradeepcrao Nov 10, 2021
e5a3439
Merge remote-tracking branch 'upstream/main' into stats_filter2
pradeepcrao Nov 11, 2021
a4d76cb
Address feedback.
pradeepcrao Nov 12, 2021
1d27687
Merge remote-tracking branch 'upstream/main' into stats_filter2
pradeepcrao Nov 12, 2021
722e5ec
Address feedback.
pradeepcrao Nov 15, 2021
260ff78
Merge remote-tracking branch 'upstream/main' into stats_filter2
pradeepcrao Nov 15, 2021
7d96f81
Address feedback.
pradeepcrao Nov 18, 2021
670d7a1
Merge remote-tracking branch 'upstream/main' into stats_filter2
pradeepcrao Nov 18, 2021
948f2c2
Address feedback.
pradeepcrao Nov 22, 2021
90790a0
Merge remote-tracking branch 'upstream/main' into stats_filter2
pradeepcrao Nov 22, 2021
eff2e29
Address feedback.
pradeepcrao Nov 29, 2021
cec764f
Merge remote-tracking branch 'upstream/main' into stats_filter2
pradeepcrao Nov 29, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions envoy/server/instance.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <chrono>
#include <cstdint>
#include <memory>
#include <string>

#include "envoy/access_log/access_log.h"
Expand Down Expand Up @@ -31,6 +32,11 @@
#include "envoy/upstream/cluster_manager.h"

namespace Envoy {

namespace Stats {
class SinkPredicates;
}

namespace Server {

/**
Expand Down Expand Up @@ -269,6 +275,12 @@ class Instance {
* TODO(mattklein123): This can be removed when version 1.20.0 is no longer supported.
*/
virtual bool enableReusePortDefault() PURE;

/**
* Set predicates for filtering counters, gauges and text readouts to be flushed to sinks.
*/
virtual void
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doxygen for this method.

setSinkPredicates(std::unique_ptr<Envoy::Stats::SinkPredicates>&& sink_predicates) PURE;
};

} // namespace Server
Expand Down
36 changes: 26 additions & 10 deletions envoy/stats/allocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
namespace Envoy {
namespace Stats {

class Sink;
class SinkPredicates;

/**
* Abstract interface for allocating statistics. Implementations can
* be created utilizing a single fixed-size block suitable for
Expand Down Expand Up @@ -70,19 +73,32 @@ class Allocator {
virtual void markTextReadoutForDeletion(const TextReadoutSharedPtr& text_readout) PURE;

/**
* Iterate over all stats that need to be added to a sink. Note, that implementations can
* Iterate over all stats. Note, that implementations can potentially hold on to a mutex that
* will deadlock if the passed in functors try to create or delete a stat.
* @param f_size functor that is provided the current number of all stats. Note that this is
* called only once, prior to any calls to f_stat.
* @param f_stat functor that is provided one stat at a time from the stats container.
*/
virtual void forEachCounter(SizeFn f_size, StatFn<Counter> f_stat) const PURE;
virtual void forEachGauge(SizeFn f_size, StatFn<Gauge> f_stat) const PURE;
virtual void forEachTextReadout(SizeFn f_size, StatFn<TextReadout> f_stat) const PURE;

/**
* Iterate over all stats that need to be flushed to sinks. Note, that implementations can
* potentially hold on to a mutex that will deadlock if the passed in functors try to create
* or delete a stat.
* @param f_size functor that is provided the number of all stats in the sink. Note this is
* called only once, prior to any calls to f_stat.
* @param f_stat functor that is provided one stat in the sink at a time.
* @param f_size functor that is provided the number of all stats that will be flushed to sinks.
* Note that this is called only once, prior to any calls to f_stat.
* @param f_stat functor that is provided one stat that will be flushed to sinks, at a time.
*/
virtual void forEachSinkedCounter(SizeFn f_size, StatFn<Counter> f_stat) const PURE;
virtual void forEachSinkedGauge(SizeFn f_size, StatFn<Gauge> f_stat) const PURE;
virtual void forEachSinkedTextReadout(SizeFn f_size, StatFn<TextReadout> f_stat) const PURE;

/**
* Set the predicates to filter counters, gauges and text readouts for sink.
*/
virtual void forEachCounter(std::function<void(std::size_t)> f_size,
std::function<void(Stats::Counter&)> f_stat) const PURE;
virtual void forEachGauge(std::function<void(std::size_t)> f_size,
std::function<void(Stats::Gauge&)> f_stat) const PURE;
virtual void forEachTextReadout(std::function<void(std::size_t)> f_size,
std::function<void(Stats::TextReadout&)> f_stat) const PURE;
virtual void setSinkPredicates(std::unique_ptr<SinkPredicates>&& sink_predicates) PURE;

// TODO(jmarantz): create a parallel mechanism to instantiate histograms. At
// the moment, histograms don't fit the same pattern of counters and gauges
Expand Down
23 changes: 23 additions & 0 deletions envoy/stats/sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,29 @@ class MetricSnapshot {
virtual SystemTime snapshotTime() const PURE;
};

/**
* A class to define predicates to filter counters, gauges and text readouts for flushing to sinks.
*/
class SinkPredicates {
public:
virtual ~SinkPredicates() = default;

/**
* @return true if @param counter needs to be flushed to sinks.
*/
virtual bool includeCounter(const Counter& counter) PURE;

/**
* @return true if @param gague needs to be flushed to sinks.
*/
virtual bool includeGauge(const Gauge& gauge) PURE;

/**
* @return true if @param text_readout needs to be flushed to sinks.
*/
virtual bool includeTextReadout(const TextReadout& text_readout) PURE;
};

/**
* A sink for stats. Each sink is responsible for writing stats to a backing store.
*/
Expand Down
10 changes: 10 additions & 0 deletions envoy/stats/stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,5 +190,15 @@ class TextReadout : public virtual Metric {

using TextReadoutSharedPtr = RefcountPtr<TextReadout>;

/**
* Callback invoked to provide size of stats container.
*/
using SizeFn = std::function<void(std::size_t)>;

/**
* Callback invoked for each stat during iteration.
*/
template <typename Stat> using StatFn = std::function<void(Stat&)>;

} // namespace Stats
} // namespace Envoy
40 changes: 28 additions & 12 deletions envoy/stats/store.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class Instance;
namespace Stats {

class Sink;
class SinkPredicates;

/**
* A store for all known counters, gauges, and timers.
Expand Down Expand Up @@ -51,20 +52,27 @@ class Store : public Scope {
virtual std::vector<ParentHistogramSharedPtr> histograms() const PURE;

/**
* Iterate over all stats that need to be added to a sink. Note, that implementations can
* potentially hold on to a mutex that will deadlock if the passed in functors try to create
* or delete a stat.
* @param f_size functor that is provided the number of all stats in the sink.
* @param f_stat functor that is provided one stat in the sink at a time.
* Iterate over all stats. Note, that implementations can potentially hold on to a mutex that
* will deadlock if the passed in functors try to create or delete a stat.
* @param f_size functor that is provided the current number of all stats. Note that this is
* called only once, prior to any calls to f_stat.
* @param f_stat functor that is provided one stat at a time from the stats container.
*/
virtual void forEachCounter(std::function<void(std::size_t)> f_size,
std::function<void(Stats::Counter&)> f_stat) const PURE;

virtual void forEachGauge(std::function<void(std::size_t)> f_size,
std::function<void(Stats::Gauge&)> f_stat) const PURE;
virtual void forEachCounter(SizeFn f_size, StatFn<Counter> f_stat) const PURE;
virtual void forEachGauge(SizeFn f_size, StatFn<Gauge> f_stat) const PURE;
virtual void forEachTextReadout(SizeFn f_size, StatFn<TextReadout> f_stat) const PURE;

virtual void forEachTextReadout(std::function<void(std::size_t)> f_size,
std::function<void(Stats::TextReadout&)> f_stat) const PURE;
/**
* Iterate over all stats that need to be flushed to sinks. Note, that implementations can
* potentially hold on to a mutex that will deadlock if the passed in functors try to create
* or delete a stat.
* @param f_size functor that is provided the number of all stats that will be flushed to sinks.
* Note that this is called only once, prior to any calls to f_stat.
* @param f_stat functor that is provided one stat that will be flushed to sinks, at a time.
*/
virtual void forEachSinkedCounter(SizeFn f_size, StatFn<Counter> f_stat) const PURE;
virtual void forEachSinkedGauge(SizeFn f_size, StatFn<Gauge> f_stat) const PURE;
virtual void forEachSinkedTextReadout(SizeFn f_size, StatFn<TextReadout> f_stat) const PURE;
};

using StorePtr = std::unique_ptr<Store>;
Expand Down Expand Up @@ -123,6 +131,14 @@ class StoreRoot : public Store {
* method would be asserted.
*/
virtual void mergeHistograms(PostMergeCb merge_complete_cb) PURE;

/**
* Set predicates for filtering counters, gauges and text readouts to be flushed to sinks.
* Note that if the sink predicates object is set, we do not send non-sink stats over to the
* child process during hot restart. This will result in the admin stats console being wrong
* during hot restart.
*/
virtual void setSinkPredicates(std::unique_ptr<SinkPredicates>&& sink_predicates) PURE;
};

using StoreRootPtr = std::unique_ptr<StoreRoot>;
Expand Down
94 changes: 88 additions & 6 deletions source/common/stats/allocator_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <algorithm>
#include <cstdint>

#include "envoy/stats/sink.h"
#include "envoy/stats/stats.h"
#include "envoy/stats/symbol_table.h"

Expand Down Expand Up @@ -144,6 +145,7 @@ class CounterImpl : public StatsSharedImpl<Counter> {
void removeFromSetLockHeld() ABSL_EXCLUSIVE_LOCKS_REQUIRED(alloc_.mutex_) override {
const size_t count = alloc_.counters_.erase(statName());
ASSERT(count == 1);
alloc_.sinked_counters_.erase(this);
}

// Stats::Counter
Expand Down Expand Up @@ -188,6 +190,7 @@ class GaugeImpl : public StatsSharedImpl<Gauge> {
void removeFromSetLockHeld() override ABSL_EXCLUSIVE_LOCKS_REQUIRED(alloc_.mutex_) {
const size_t count = alloc_.gauges_.erase(statName());
ASSERT(count == 1);
alloc_.sinked_gauges_.erase(this);
}

// Stats::Gauge
Expand Down Expand Up @@ -260,6 +263,7 @@ class TextReadoutImpl : public StatsSharedImpl<TextReadout> {
void removeFromSetLockHeld() ABSL_EXCLUSIVE_LOCKS_REQUIRED(alloc_.mutex_) override {
const size_t count = alloc_.text_readouts_.erase(statName());
ASSERT(count == 1);
alloc_.sinked_text_readouts_.erase(this);
}

// Stats::TextReadout
Expand Down Expand Up @@ -289,6 +293,11 @@ CounterSharedPtr AllocatorImpl::makeCounter(StatName name, StatName tag_extracte
}
auto counter = CounterSharedPtr(makeCounterInternal(name, tag_extracted_name, stat_name_tags));
counters_.insert(counter.get());
// Add counter to sinked_counters_ if it matches the sink predicate.
if (sink_predicates_ != nullptr && sink_predicates_->includeCounter(*counter)) {
auto val = sinked_counters_.insert(counter.get());
ASSERT(val.second);
}
return counter;
}

Expand All @@ -305,6 +314,11 @@ GaugeSharedPtr AllocatorImpl::makeGauge(StatName name, StatName tag_extracted_na
auto gauge =
GaugeSharedPtr(new GaugeImpl(name, *this, tag_extracted_name, stat_name_tags, import_mode));
gauges_.insert(gauge.get());
// Add gauge to sinked_gauges_ if it matches the sink predicate.
if (sink_predicates_ != nullptr && sink_predicates_->includeGauge(*gauge)) {
auto val = sinked_gauges_.insert(gauge.get());
ASSERT(val.second);
}
return gauge;
}

Expand All @@ -320,6 +334,11 @@ TextReadoutSharedPtr AllocatorImpl::makeTextReadout(StatName name, StatName tag_
auto text_readout =
TextReadoutSharedPtr(new TextReadoutImpl(name, *this, tag_extracted_name, stat_name_tags));
text_readouts_.insert(text_readout.get());
// Add text_readout to sinked_text_readouts_ if it matches the sink predicate.
if (sink_predicates_ != nullptr && sink_predicates_->includeTextReadout(*text_readout)) {
auto val = sinked_text_readouts_.insert(text_readout.get());
ASSERT(val.second);
}
return text_readout;
}

Expand All @@ -336,8 +355,7 @@ Counter* AllocatorImpl::makeCounterInternal(StatName name, StatName tag_extracte
return new CounterImpl(name, *this, tag_extracted_name, stat_name_tags);
}

void AllocatorImpl::forEachCounter(std::function<void(std::size_t)> f_size,
std::function<void(Stats::Counter&)> f_stat) const {
void AllocatorImpl::forEachCounter(SizeFn f_size, StatFn<Counter> f_stat) const {
Thread::LockGuard lock(mutex_);
if (f_size != nullptr) {
f_size(counters_.size());
Expand All @@ -347,8 +365,7 @@ void AllocatorImpl::forEachCounter(std::function<void(std::size_t)> f_size,
}
}

void AllocatorImpl::forEachGauge(std::function<void(std::size_t)> f_size,
std::function<void(Stats::Gauge&)> f_stat) const {
void AllocatorImpl::forEachGauge(SizeFn f_size, StatFn<Gauge> f_stat) const {
Thread::LockGuard lock(mutex_);
if (f_size != nullptr) {
f_size(gauges_.size());
Expand All @@ -358,8 +375,7 @@ void AllocatorImpl::forEachGauge(std::function<void(std::size_t)> f_size,
}
}

void AllocatorImpl::forEachTextReadout(std::function<void(std::size_t)> f_size,
std::function<void(Stats::TextReadout&)> f_stat) const {
void AllocatorImpl::forEachTextReadout(SizeFn f_size, StatFn<TextReadout> f_stat) const {
Thread::LockGuard lock(mutex_);
if (f_size != nullptr) {
f_size(text_readouts_.size());
Expand All @@ -369,6 +385,69 @@ void AllocatorImpl::forEachTextReadout(std::function<void(std::size_t)> f_size,
}
}

void AllocatorImpl::forEachSinkedCounter(SizeFn f_size, StatFn<Counter> f_stat) const {
if (sink_predicates_ != nullptr) {
Thread::LockGuard lock(mutex_);
f_size(sinked_counters_.size());
for (auto counter : sinked_counters_) {
f_stat(*counter);
}
} else {
forEachCounter(f_size, f_stat);
}
}

void AllocatorImpl::forEachSinkedGauge(SizeFn f_size, StatFn<Gauge> f_stat) const {
if (sink_predicates_ != nullptr) {
Thread::LockGuard lock(mutex_);
f_size(sinked_gauges_.size());
for (auto gauge : sinked_gauges_) {
f_stat(*gauge);
}
} else {
forEachGauge(f_size, f_stat);
}
}

void AllocatorImpl::forEachSinkedTextReadout(SizeFn f_size, StatFn<TextReadout> f_stat) const {
if (sink_predicates_ != nullptr) {
Thread::LockGuard lock(mutex_);
f_size(sinked_text_readouts_.size());
for (auto text_readout : sinked_text_readouts_) {
f_stat(*text_readout);
}
} else {
forEachTextReadout(f_size, f_stat);
}
}

void AllocatorImpl::setSinkPredicates(std::unique_ptr<SinkPredicates>&& sink_predicates) {
Thread::LockGuard lock(mutex_);
ASSERT(sink_predicates_ == nullptr);
sink_predicates_ = std::move(sink_predicates);
sinked_counters_.clear();
sinked_gauges_.clear();
sinked_text_readouts_.clear();
// Add counters to the set of sinked counters.
for (auto& counter : counters_) {
if (sink_predicates_->includeCounter(*counter)) {
sinked_counters_.emplace(counter);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't you clear all of these before this function is called? Not sure when this can get called but seems like the safe thing to do? If not can you comment why?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function is meant to be called just once, but there's nothing enforcing this other than the check that sink_predicates_ is nullptr further up in this method. Added lines to clear the sets of sinked stats.

}
}
// Add gauges to the set of sinked gauges.
for (auto& gauge : gauges_) {
if (sink_predicates_->includeGauge(*gauge)) {
sinked_gauges_.insert(gauge);
}
}
// Add text_readouts to the set of sinked text readouts.
for (auto& text_readout : text_readouts_) {
if (sink_predicates_->includeTextReadout(*text_readout)) {
sinked_text_readouts_.insert(text_readout);
}
}
}

void AllocatorImpl::markCounterForDeletion(const CounterSharedPtr& counter) {
Thread::LockGuard lock(mutex_);
auto iter = counters_.find(counter->statName());
Expand All @@ -380,6 +459,7 @@ void AllocatorImpl::markCounterForDeletion(const CounterSharedPtr& counter) {
// Duplicates are ASSERTed in ~AllocatorImpl.
deleted_counters_.emplace_back(*iter);
counters_.erase(iter);
sinked_counters_.erase(counter.get());
}

void AllocatorImpl::markGaugeForDeletion(const GaugeSharedPtr& gauge) {
Expand All @@ -393,6 +473,7 @@ void AllocatorImpl::markGaugeForDeletion(const GaugeSharedPtr& gauge) {
// Duplicates are ASSERTed in ~AllocatorImpl.
deleted_gauges_.emplace_back(*iter);
gauges_.erase(iter);
sinked_gauges_.erase(gauge.get());
}

void AllocatorImpl::markTextReadoutForDeletion(const TextReadoutSharedPtr& text_readout) {
Expand All @@ -406,6 +487,7 @@ void AllocatorImpl::markTextReadoutForDeletion(const TextReadoutSharedPtr& text_
// Duplicates are ASSERTed in ~AllocatorImpl.
deleted_text_readouts_.emplace_back(*iter);
text_readouts_.erase(iter);
sinked_text_readouts_.erase(text_readout.get());
}

} // namespace Stats
Expand Down
Loading