Skip to content
8 changes: 5 additions & 3 deletions source/extensions/stat_sinks/metrics_service/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@ MetricsServiceSinkFactory::createStatsSink(const Protobuf::Message& config,
const auto& transport_api_version = sink_config.transport_api_version();
ENVOY_LOG(debug, "Metrics Service gRPC service configuration: {}", grpc_service.DebugString());

std::shared_ptr<GrpcMetricsStreamer> grpc_metrics_streamer =
std::make_shared<GrpcMetricsStreamerImpl>(
std::shared_ptr<GrpcMetricsStreamer<envoy::service::metrics::v3::StreamMetricsMessage,
envoy::service::metrics::v3::StreamMetricsResponse>>
grpc_metrics_streamer = std::make_shared<GrpcMetricsStreamerImpl>(
server.clusterManager().grpcAsyncClientManager().factoryForGrpcService(
grpc_service, server.scope(), false),
server.localInfo(), transport_api_version);

return std::make_unique<MetricsServiceSink>(
return std::make_unique<MetricsServiceSink<envoy::service::metrics::v3::StreamMetricsMessage,
envoy::service::metrics::v3::StreamMetricsResponse>>(
grpc_metrics_streamer,
PROTOBUF_GET_WRAPPED_OR_DEFAULT(sink_config, report_counters_as_deltas, false));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,23 @@ namespace MetricsService {
GrpcMetricsStreamerImpl::GrpcMetricsStreamerImpl(
Grpc::AsyncClientFactoryPtr&& factory, const LocalInfo::LocalInfo& local_info,
envoy::config::core::v3::ApiVersion transport_api_version)
: client_(factory->create()), local_info_(local_info),
: GrpcMetricsStreamer<envoy::service::metrics::v3::StreamMetricsMessage,
envoy::service::metrics::v3::StreamMetricsResponse>(*factory),
local_info_(local_info),
service_method_(
Grpc::VersionedMethods("envoy.service.metrics.v3.MetricsService.StreamMetrics",
"envoy.service.metrics.v2.MetricsService.StreamMetrics")
.getMethodDescriptorForVersion(transport_api_version)),
transport_api_version_(transport_api_version) {}

void GrpcMetricsStreamerImpl::send(envoy::service::metrics::v3::StreamMetricsMessage& message) {
void GrpcMetricsStreamerImpl::send(MetricsPtr&& metrics) {
envoy::service::metrics::v3::StreamMetricsMessage message;
message.mutable_envoy_metrics()->Reserve(metrics->size());
message.mutable_envoy_metrics()->MergeFrom(*metrics);

if (stream_ == nullptr) {
stream_ = client_->start(service_method_, *this, Http::AsyncClient::StreamOptions());
// For perf reasons, the identifier is only sent on establishing the stream.
auto* identifier = message.mutable_identifier();
*identifier->mutable_node() = local_info_.node();
}
Expand All @@ -39,17 +46,45 @@ void GrpcMetricsStreamerImpl::send(envoy::service::metrics::v3::StreamMetricsMes
}
}

MetricsServiceSink::MetricsServiceSink(const GrpcMetricsStreamerSharedPtr& grpc_metrics_streamer,
const bool report_counters_as_deltas)
: grpc_metrics_streamer_(grpc_metrics_streamer),
report_counters_as_deltas_(report_counters_as_deltas) {}

void MetricsServiceSink::flushCounter(
const Stats::MetricSnapshot::CounterSnapshot& counter_snapshot, int64_t snapshot_time_ms) {
io::prometheus::client::MetricFamily* metrics_family = message_.add_envoy_metrics();
metrics_family->set_type(io::prometheus::client::MetricType::COUNTER);
metrics_family->set_name(counter_snapshot.counter_.get().name());
auto* metric = metrics_family->add_metric();
MetricsPtr MetricsFlusher::flush(Stats::MetricSnapshot& snapshot) const {
auto metrics =
std::make_unique<Envoy::Protobuf::RepeatedPtrField<io::prometheus::client::MetricFamily>>();
Comment on lines +50 to +51
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you going to need to override this also? Or if you do will you override the entire flusher?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we overrode, then yes, we would have to override the entire flusher. This proto struct is the minimal container that the flusher uses to aggregate metrics, so I don't see us changing it; unless we wanted to change the metrics format we use on the wire, and in the server-side service. From the core use in Envoy Mobile, and from my conversations with @jingwei99, I haven't come up with a good use case for changing the data format.


// TODO(mrice32): there's probably some more sophisticated preallocation we can do here where we
// actually preallocate the submessages and then pass ownership to the proto (rather than just
// preallocating the pointer array).
metrics->Reserve(snapshot.counters().size() + snapshot.gauges().size() +
snapshot.histograms().size());
int64_t snapshot_time_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
snapshot.snapshotTime().time_since_epoch())
.count();
for (const auto& counter : snapshot.counters()) {
if (counter.counter_.get().used()) {
flushCounter(*metrics->Add(), counter, snapshot_time_ms);
}
}

for (const auto& gauge : snapshot.gauges()) {
if (gauge.get().used()) {
flushGauge(*metrics->Add(), gauge.get(), snapshot_time_ms);
}
}

for (const auto& histogram : snapshot.histograms()) {
if (histogram.get().used()) {
flushHistogram(*metrics->Add(), *metrics->Add(), histogram.get(), snapshot_time_ms);
}
}

return metrics;
}

void MetricsFlusher::flushCounter(io::prometheus::client::MetricFamily& metrics_family,
const Stats::MetricSnapshot::CounterSnapshot& counter_snapshot,
int64_t snapshot_time_ms) const {
metrics_family.set_type(io::prometheus::client::MetricType::COUNTER);
metrics_family.set_name(counter_snapshot.counter_.get().name());
auto* metric = metrics_family.add_metric();
metric->set_timestamp_ms(snapshot_time_ms);
auto* counter_metric = metric->mutable_counter();
if (report_counters_as_deltas_) {
Expand All @@ -59,27 +94,28 @@ void MetricsServiceSink::flushCounter(
}
}

void MetricsServiceSink::flushGauge(const Stats::Gauge& gauge, int64_t snapshot_time_ms) {
io::prometheus::client::MetricFamily* metrics_family = message_.add_envoy_metrics();
metrics_family->set_type(io::prometheus::client::MetricType::GAUGE);
metrics_family->set_name(gauge.name());
auto* metric = metrics_family->add_metric();
void MetricsFlusher::flushGauge(io::prometheus::client::MetricFamily& metrics_family,
const Stats::Gauge& gauge, int64_t snapshot_time_ms) const {
metrics_family.set_type(io::prometheus::client::MetricType::GAUGE);
metrics_family.set_name(gauge.name());
auto* metric = metrics_family.add_metric();
metric->set_timestamp_ms(snapshot_time_ms);
auto* gauge_metric = metric->mutable_gauge();
gauge_metric->set_value(gauge.value());
}

void MetricsServiceSink::flushHistogram(const Stats::ParentHistogram& envoy_histogram,
int64_t snapshot_time_ms) {
void MetricsFlusher::flushHistogram(io::prometheus::client::MetricFamily& summary_metrics_family,
io::prometheus::client::MetricFamily& histogram_metrics_family,
const Stats::ParentHistogram& envoy_histogram,
int64_t snapshot_time_ms) const {
// TODO(ramaraochavali): Currently we are sending both quantile information and bucket
// information. We should make this configurable if it turns out that sending both affects
// performance.

// Add summary information for histograms.
io::prometheus::client::MetricFamily* summary_metrics_family = message_.add_envoy_metrics();
summary_metrics_family->set_type(io::prometheus::client::MetricType::SUMMARY);
summary_metrics_family->set_name(envoy_histogram.name());
auto* summary_metric = summary_metrics_family->add_metric();
summary_metrics_family.set_type(io::prometheus::client::MetricType::SUMMARY);
summary_metrics_family.set_name(envoy_histogram.name());
auto* summary_metric = summary_metrics_family.add_metric();
summary_metric->set_timestamp_ms(snapshot_time_ms);
auto* summary = summary_metric->mutable_summary();
const Stats::HistogramStatistics& hist_stats = envoy_histogram.intervalStatistics();
Expand All @@ -90,10 +126,9 @@ void MetricsServiceSink::flushHistogram(const Stats::ParentHistogram& envoy_hist
}

// Add bucket information for histograms.
io::prometheus::client::MetricFamily* histogram_metrics_family = message_.add_envoy_metrics();
histogram_metrics_family->set_type(io::prometheus::client::MetricType::HISTOGRAM);
histogram_metrics_family->set_name(envoy_histogram.name());
auto* histogram_metric = histogram_metrics_family->add_metric();
histogram_metrics_family.set_type(io::prometheus::client::MetricType::HISTOGRAM);
histogram_metrics_family.set_name(envoy_histogram.name());
auto* histogram_metric = histogram_metrics_family.add_metric();
histogram_metric->set_timestamp_ms(snapshot_time_ms);
auto* histogram = histogram_metric->mutable_histogram();
histogram->set_sample_count(hist_stats.sampleCount());
Expand All @@ -104,43 +139,6 @@ void MetricsServiceSink::flushHistogram(const Stats::ParentHistogram& envoy_hist
bucket->set_cumulative_count(hist_stats.computedBuckets()[i]);
}
}

void MetricsServiceSink::flush(Stats::MetricSnapshot& snapshot) {
message_.clear_envoy_metrics();

// TODO(mrice32): there's probably some more sophisticated preallocation we can do here where we
// actually preallocate the submessages and then pass ownership to the proto (rather than just
// preallocating the pointer array).
message_.mutable_envoy_metrics()->Reserve(snapshot.counters().size() + snapshot.gauges().size() +
snapshot.histograms().size());
int64_t snapshot_time_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
snapshot.snapshotTime().time_since_epoch())
.count();
for (const auto& counter : snapshot.counters()) {
if (counter.counter_.get().used()) {
flushCounter(counter, snapshot_time_ms);
}
}

for (const auto& gauge : snapshot.gauges()) {
if (gauge.get().used()) {
flushGauge(gauge.get(), snapshot_time_ms);
}
}

for (const auto& histogram : snapshot.histograms()) {
if (histogram.get().used()) {
flushHistogram(histogram.get(), snapshot_time_ms);
}
}

grpc_metrics_streamer_->send(message_);
// for perf reasons, clear the identifier after the first flush.
if (message_.has_identifier()) {
message_.clear_identifier();
}
}

} // namespace MetricsService
} // namespace StatSinks
} // namespace Extensions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,79 +20,106 @@ namespace Extensions {
namespace StatSinks {
namespace MetricsService {

using MetricsPtr =
std::unique_ptr<Envoy::Protobuf::RepeatedPtrField<io::prometheus::client::MetricFamily>>;

/**
* Interface for metrics streamer.
*/
class GrpcMetricsStreamer
: public Grpc::AsyncStreamCallbacks<envoy::service::metrics::v3::StreamMetricsResponse> {
template <class RequestProto, class ResponseProto>
class GrpcMetricsStreamer : public Grpc::AsyncStreamCallbacks<ResponseProto> {
public:
explicit GrpcMetricsStreamer(Grpc::AsyncClientFactory& factory) : client_(factory.create()) {}
~GrpcMetricsStreamer() override = default;

/**
* Send Metrics Message.
* @param message supplies the metrics to send.
*/
virtual void send(envoy::service::metrics::v3::StreamMetricsMessage& message) PURE;
virtual void send(MetricsPtr&& metrics) PURE;

// Grpc::AsyncStreamCallbacks
void onCreateInitialMetadata(Http::RequestHeaderMap&) override {}
void onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&&) override {}
void
onReceiveMessage(std::unique_ptr<envoy::service::metrics::v3::StreamMetricsResponse>&&) override {
}
void onReceiveMessage(std::unique_ptr<ResponseProto>&&) override {}
void onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&&) override {}
void onRemoteClose(Grpc::Status::GrpcStatus, const std::string&) override{};

protected:
Grpc::AsyncStream<RequestProto> stream_{};
Grpc::AsyncClient<RequestProto, ResponseProto> client_;
};

using GrpcMetricsStreamerSharedPtr = std::shared_ptr<GrpcMetricsStreamer>;
template <class RequestProto, class ResponseProto>
using GrpcMetricsStreamerSharedPtr =
std::shared_ptr<GrpcMetricsStreamer<RequestProto, ResponseProto>>;

/**
* Production implementation of GrpcMetricsStreamer
*/
class GrpcMetricsStreamerImpl : public Singleton::Instance, public GrpcMetricsStreamer {
class GrpcMetricsStreamerImpl
: public Singleton::Instance,
public GrpcMetricsStreamer<envoy::service::metrics::v3::StreamMetricsMessage,
envoy::service::metrics::v3::StreamMetricsResponse> {
public:
GrpcMetricsStreamerImpl(Grpc::AsyncClientFactoryPtr&& factory,
const LocalInfo::LocalInfo& local_info,
envoy::config::core::v3::ApiVersion transport_api_version);

// GrpcMetricsStreamer
void send(envoy::service::metrics::v3::StreamMetricsMessage& message) override;
void send(MetricsPtr&& metrics) override;

// Grpc::AsyncStreamCallbacks
void onRemoteClose(Grpc::Status::GrpcStatus, const std::string&) override { stream_ = nullptr; }

private:
Grpc::AsyncStream<envoy::service::metrics::v3::StreamMetricsMessage> stream_{};
Grpc::AsyncClient<envoy::service::metrics::v3::StreamMetricsMessage,
envoy::service::metrics::v3::StreamMetricsResponse>
client_;
const LocalInfo::LocalInfo& local_info_;
const Protobuf::MethodDescriptor& service_method_;
const envoy::config::core::v3::ApiVersion transport_api_version_;
};

using GrpcMetricsStreamerImplPtr = std::unique_ptr<GrpcMetricsStreamerImpl>;

class MetricsFlusher {
public:
explicit MetricsFlusher(const bool report_counters_as_deltas)
: report_counters_as_deltas_(report_counters_as_deltas) {}

MetricsPtr flush(Stats::MetricSnapshot& snapshot) const;

private:
void flushCounter(io::prometheus::client::MetricFamily& metrics_family,
const Stats::MetricSnapshot::CounterSnapshot& counter_snapshot,
int64_t snapshot_time_ms) const;
void flushGauge(io::prometheus::client::MetricFamily& metrics_family, const Stats::Gauge& gauge,
int64_t snapshot_time_ms) const;
void flushHistogram(io::prometheus::client::MetricFamily& summary_metrics_family,
io::prometheus::client::MetricFamily& histogram_metrics_family,
const Stats::ParentHistogram& envoy_histogram,
int64_t snapshot_time_ms) const;

const bool report_counters_as_deltas_;
};

/**
* Stat Sink implementation of Metrics Service.
* Stat Sink that flushes metrics via a gRPC service.
*/
class MetricsServiceSink : public Stats::Sink {
template <class RequestProto, class ResponseProto> class MetricsServiceSink : public Stats::Sink {
public:
// MetricsService::Sink
MetricsServiceSink(const GrpcMetricsStreamerSharedPtr& grpc_metrics_streamer,
const bool report_counters_as_deltas);
void flush(Stats::MetricSnapshot& snapshot) override;
void onHistogramComplete(const Stats::Histogram&, uint64_t) override {}
MetricsServiceSink(
const GrpcMetricsStreamerSharedPtr<RequestProto, ResponseProto>& grpc_metrics_streamer,
const bool report_counters_as_deltas)
: flusher_(report_counters_as_deltas), grpc_metrics_streamer_(grpc_metrics_streamer) {}

void flushCounter(const Stats::MetricSnapshot::CounterSnapshot& counter_snapshot,
int64_t snapshot_time_ms);
void flushGauge(const Stats::Gauge& gauge, int64_t snapshot_time_ms);
void flushHistogram(const Stats::ParentHistogram& envoy_histogram, int64_t snapshot_time_ms);
void flush(Stats::MetricSnapshot& snapshot) override {
grpc_metrics_streamer_->send(flusher_.flush(snapshot));
}
void onHistogramComplete(const Stats::Histogram&, uint64_t) override {}

private:
GrpcMetricsStreamerSharedPtr grpc_metrics_streamer_;
envoy::service::metrics::v3::StreamMetricsMessage message_;
const bool report_counters_as_deltas_;
const MetricsFlusher flusher_;
GrpcMetricsStreamerSharedPtr<RequestProto, ResponseProto> grpc_metrics_streamer_;
};

} // namespace MetricsService
Expand Down
Loading