From c637a39e0639e2e900212c3dc1d9853945c1b079 Mon Sep 17 00:00:00 2001 From: Jose Nino Date: Thu, 5 Nov 2020 11:48:54 -0800 Subject: [PATCH 01/10] metrics service sink: generalize the sink and grpc streamer for external use Signed-off-by: Jose Nino --- .../stat_sinks/metrics_service/config.cc | 4 +- .../grpc_metrics_service_impl.cc | 34 ++++++++------ .../grpc_metrics_service_impl.h | 29 +++++++----- .../grpc_metrics_service_impl_test.cc | 46 +++++++++++-------- 4 files changed, 64 insertions(+), 49 deletions(-) diff --git a/source/extensions/stat_sinks/metrics_service/config.cc b/source/extensions/stat_sinks/metrics_service/config.cc index 05228e9a67c40..60dd7f0d15c5b 100644 --- a/source/extensions/stat_sinks/metrics_service/config.cc +++ b/source/extensions/stat_sinks/metrics_service/config.cc @@ -29,8 +29,8 @@ MetricsServiceSinkFactory::createStatsSink(const Protobuf::Message& config, const auto& transport_api_version = sink_config.transport_api_version(); ENVOY_LOG(debug, "Metrics Service gRPC service configuration: {}", grpc_service.DebugString()); - std::shared_ptr grpc_metrics_streamer = - std::make_shared( + std::shared_ptr> + grpc_metrics_streamer = std::make_shared( server.clusterManager().grpcAsyncClientManager().factoryForGrpcService( grpc_service, server.scope(), false), server.localInfo(), transport_api_version); diff --git a/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.cc b/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.cc index d18bfbf20c838..e581045970d3a 100644 --- a/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.cc +++ b/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.cc @@ -28,9 +28,15 @@ GrpcMetricsStreamerImpl::GrpcMetricsStreamerImpl( .getMethodDescriptorForVersion(transport_api_version)), transport_api_version_(transport_api_version) {} -void GrpcMetricsStreamerImpl::send(envoy::service::metrics::v3::StreamMetricsMessage& message) { +void GrpcMetricsStreamerImpl::send( + Envoy::Protobuf::RepeatedPtrField& metrics) { + envoy::service::metrics::v3::StreamMetricsMessage message; + message.mutable_envoy_metrics()->Reserve(metrics.size()); + message.mutable_envoy_metrics()->MergeFrom(metrics); + if (stream_ == nullptr) { stream_ = client_->start(service_method_, *this, Http::AsyncClient::StreamOptions()); + // for perf reasons, the identifier is only sent on establishing the stream. auto* identifier = message.mutable_identifier(); *identifier->mutable_node() = local_info_.node(); } @@ -39,14 +45,16 @@ void GrpcMetricsStreamerImpl::send(envoy::service::metrics::v3::StreamMetricsMes } } -MetricsServiceSink::MetricsServiceSink(const GrpcMetricsStreamerSharedPtr& grpc_metrics_streamer, - const bool report_counters_as_deltas) +MetricsServiceSink::MetricsServiceSink( + const GrpcMetricsStreamerSharedPtr& + grpc_metrics_streamer, + const bool report_counters_as_deltas) : grpc_metrics_streamer_(grpc_metrics_streamer), report_counters_as_deltas_(report_counters_as_deltas) {} void MetricsServiceSink::flushCounter( const Stats::MetricSnapshot::CounterSnapshot& counter_snapshot, int64_t snapshot_time_ms) { - io::prometheus::client::MetricFamily* metrics_family = message_.add_envoy_metrics(); + io::prometheus::client::MetricFamily* metrics_family = metrics_.Add(); metrics_family->set_type(io::prometheus::client::MetricType::COUNTER); metrics_family->set_name(counter_snapshot.counter_.get().name()); auto* metric = metrics_family->add_metric(); @@ -60,7 +68,7 @@ void MetricsServiceSink::flushCounter( } void MetricsServiceSink::flushGauge(const Stats::Gauge& gauge, int64_t snapshot_time_ms) { - io::prometheus::client::MetricFamily* metrics_family = message_.add_envoy_metrics(); + io::prometheus::client::MetricFamily* metrics_family = metrics_.Add(); metrics_family->set_type(io::prometheus::client::MetricType::GAUGE); metrics_family->set_name(gauge.name()); auto* metric = metrics_family->add_metric(); @@ -76,7 +84,7 @@ void MetricsServiceSink::flushHistogram(const Stats::ParentHistogram& envoy_hist // performance. // Add summary information for histograms. - io::prometheus::client::MetricFamily* summary_metrics_family = message_.add_envoy_metrics(); + io::prometheus::client::MetricFamily* summary_metrics_family = metrics_.Add(); summary_metrics_family->set_type(io::prometheus::client::MetricType::SUMMARY); summary_metrics_family->set_name(envoy_histogram.name()); auto* summary_metric = summary_metrics_family->add_metric(); @@ -90,7 +98,7 @@ void MetricsServiceSink::flushHistogram(const Stats::ParentHistogram& envoy_hist } // Add bucket information for histograms. - io::prometheus::client::MetricFamily* histogram_metrics_family = message_.add_envoy_metrics(); + io::prometheus::client::MetricFamily* histogram_metrics_family = metrics_.Add(); histogram_metrics_family->set_type(io::prometheus::client::MetricType::HISTOGRAM); histogram_metrics_family->set_name(envoy_histogram.name()); auto* histogram_metric = histogram_metrics_family->add_metric(); @@ -106,13 +114,13 @@ void MetricsServiceSink::flushHistogram(const Stats::ParentHistogram& envoy_hist } void MetricsServiceSink::flush(Stats::MetricSnapshot& snapshot) { - message_.clear_envoy_metrics(); + metrics_.Clear(); // TODO(mrice32): there's probably some more sophisticated preallocation we can do here where we // actually preallocate the submessages and then pass ownership to the proto (rather than just // preallocating the pointer array). - message_.mutable_envoy_metrics()->Reserve(snapshot.counters().size() + snapshot.gauges().size() + - snapshot.histograms().size()); + metrics_.Reserve(snapshot.counters().size() + snapshot.gauges().size() + + snapshot.histograms().size()); int64_t snapshot_time_ms = std::chrono::duration_cast( snapshot.snapshotTime().time_since_epoch()) .count(); @@ -134,11 +142,7 @@ void MetricsServiceSink::flush(Stats::MetricSnapshot& snapshot) { } } - grpc_metrics_streamer_->send(message_); - // for perf reasons, clear the identifier after the first flush. - if (message_.has_identifier()) { - message_.clear_identifier(); - } + grpc_metrics_streamer_->send(metrics_); } } // namespace MetricsService diff --git a/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.h b/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.h index 668c7f3fb5676..ab0a30e1ad3c7 100644 --- a/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.h +++ b/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.h @@ -23,8 +23,8 @@ namespace MetricsService { /** * Interface for metrics streamer. */ -class GrpcMetricsStreamer - : public Grpc::AsyncStreamCallbacks { +template +class GrpcMetricsStreamer : public Grpc::AsyncStreamCallbacks { public: ~GrpcMetricsStreamer() override = default; @@ -32,31 +32,34 @@ class GrpcMetricsStreamer * Send Metrics Message. * @param message supplies the metrics to send. */ - virtual void send(envoy::service::metrics::v3::StreamMetricsMessage& message) PURE; + virtual void + send(Envoy::Protobuf::RepeatedPtrField& metrics) PURE; // Grpc::AsyncStreamCallbacks void onCreateInitialMetadata(Http::RequestHeaderMap&) override {} void onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&&) override {} - void - onReceiveMessage(std::unique_ptr&&) override { - } + void onReceiveMessage(std::unique_ptr&&) override {} void onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&&) override {} void onRemoteClose(Grpc::Status::GrpcStatus, const std::string&) override{}; }; -using GrpcMetricsStreamerSharedPtr = std::shared_ptr; +template +using GrpcMetricsStreamerSharedPtr = std::shared_ptr>; /** * Production implementation of GrpcMetricsStreamer */ -class GrpcMetricsStreamerImpl : public Singleton::Instance, public GrpcMetricsStreamer { +class GrpcMetricsStreamerImpl + : public Singleton::Instance, + public GrpcMetricsStreamer { public: GrpcMetricsStreamerImpl(Grpc::AsyncClientFactoryPtr&& factory, const LocalInfo::LocalInfo& local_info, envoy::config::core::v3::ApiVersion transport_api_version); // GrpcMetricsStreamer - void send(envoy::service::metrics::v3::StreamMetricsMessage& message) override; + void + send(Envoy::Protobuf::RepeatedPtrField& metrics) override; // Grpc::AsyncStreamCallbacks void onRemoteClose(Grpc::Status::GrpcStatus, const std::string&) override { stream_ = nullptr; } @@ -79,7 +82,8 @@ using GrpcMetricsStreamerImplPtr = std::unique_ptr; class MetricsServiceSink : public Stats::Sink { public: // MetricsService::Sink - MetricsServiceSink(const GrpcMetricsStreamerSharedPtr& grpc_metrics_streamer, + MetricsServiceSink(const GrpcMetricsStreamerSharedPtr< + envoy::service::metrics::v3::StreamMetricsResponse>& grpc_metrics_streamer, const bool report_counters_as_deltas); void flush(Stats::MetricSnapshot& snapshot) override; void onHistogramComplete(const Stats::Histogram&, uint64_t) override {} @@ -90,8 +94,9 @@ class MetricsServiceSink : public Stats::Sink { void flushHistogram(const Stats::ParentHistogram& envoy_histogram, int64_t snapshot_time_ms); private: - GrpcMetricsStreamerSharedPtr grpc_metrics_streamer_; - envoy::service::metrics::v3::StreamMetricsMessage message_; + GrpcMetricsStreamerSharedPtr + grpc_metrics_streamer_; + Envoy::Protobuf::RepeatedPtrField metrics_; const bool report_counters_as_deltas_; }; diff --git a/test/extensions/stats_sinks/metrics_service/grpc_metrics_service_impl_test.cc b/test/extensions/stats_sinks/metrics_service/grpc_metrics_service_impl_test.cc index 927b710ac7562..61d5c30b6f9b5 100644 --- a/test/extensions/stats_sinks/metrics_service/grpc_metrics_service_impl_test.cc +++ b/test/extensions/stats_sinks/metrics_service/grpc_metrics_service_impl_test.cc @@ -62,8 +62,8 @@ TEST_F(GrpcMetricsStreamerImplTest, BasicFlow) { expectStreamStart(stream1, &callbacks1); EXPECT_CALL(local_info_, node()); EXPECT_CALL(stream1, sendMessageRaw_(_, false)); - envoy::service::metrics::v3::StreamMetricsMessage message_metrics1; - streamer_->send(message_metrics1); + Envoy::Protobuf::RepeatedPtrField metrics1; + streamer_->send(metrics1); // Verify that sending an empty response message doesn't do anything bad. callbacks1->onReceiveMessage( std::make_unique()); @@ -81,14 +81,16 @@ TEST_F(GrpcMetricsStreamerImplTest, StreamFailure) { return nullptr; })); EXPECT_CALL(local_info_, node()); - envoy::service::metrics::v3::StreamMetricsMessage message_metrics1; - streamer_->send(message_metrics1); + Envoy::Protobuf::RepeatedPtrField metrics1; + streamer_->send(metrics1); } -class MockGrpcMetricsStreamer : public GrpcMetricsStreamer { +class MockGrpcMetricsStreamer + : public GrpcMetricsStreamer { public: // GrpcMetricsStreamer - MOCK_METHOD(void, send, (envoy::service::metrics::v3::StreamMetricsMessage & message)); + MOCK_METHOD(void, send, + (Envoy::Protobuf::RepeatedPtrField & metrics)); }; class MetricsServiceSinkTest : public testing::Test { @@ -139,17 +141,19 @@ TEST_F(MetricsServiceSinkTest, CheckStatsCount) { snapshot_.gauges_.push_back(*gauge); EXPECT_CALL(*streamer_, send(_)) - .WillOnce(Invoke([](envoy::service::metrics::v3::StreamMetricsMessage& message) { - EXPECT_EQ(2, message.envoy_metrics_size()); - })); + .WillOnce(Invoke( + [](Envoy::Protobuf::RepeatedPtrField& metrics) { + EXPECT_EQ(2, metrics.size()); + })); sink.flush(snapshot_); // Verify only newly added metrics come after endFlush call. gauge->used_ = false; EXPECT_CALL(*streamer_, send(_)) - .WillOnce(Invoke([](envoy::service::metrics::v3::StreamMetricsMessage& message) { - EXPECT_EQ(1, message.envoy_metrics_size()); - })); + .WillOnce(Invoke( + [](Envoy::Protobuf::RepeatedPtrField& metrics) { + EXPECT_EQ(1, metrics.size()); + })); sink.flush(snapshot_); } @@ -164,10 +168,11 @@ TEST_F(MetricsServiceSinkTest, ReportCountersValues) { snapshot_.counters_.push_back({1, *counter}); EXPECT_CALL(*streamer_, send(_)) - .WillOnce(Invoke([](envoy::service::metrics::v3::StreamMetricsMessage& message) { - EXPECT_EQ(1, message.envoy_metrics_size()); - EXPECT_EQ(100, message.envoy_metrics(0).metric(0).counter().value()); - })); + .WillOnce(Invoke( + [](Envoy::Protobuf::RepeatedPtrField& metrics) { + EXPECT_EQ(1, metrics.size()); + EXPECT_EQ(100, metrics[0].metric(0).counter().value()); + })); sink.flush(snapshot_); } @@ -182,10 +187,11 @@ TEST_F(MetricsServiceSinkTest, ReportCountersAsDeltas) { snapshot_.counters_.push_back({1, *counter}); EXPECT_CALL(*streamer_, send(_)) - .WillOnce(Invoke([](envoy::service::metrics::v3::StreamMetricsMessage& message) { - EXPECT_EQ(1, message.envoy_metrics_size()); - EXPECT_EQ(1, message.envoy_metrics(0).metric(0).counter().value()); - })); + .WillOnce(Invoke( + [](Envoy::Protobuf::RepeatedPtrField& metrics) { + EXPECT_EQ(1, metrics.size()); + EXPECT_EQ(1, metrics[0].metric(0).counter().value()); + })); sink.flush(snapshot_); } From 9ff95beb1ca19e95abb344cdc136fff3103c5e70 Mon Sep 17 00:00:00 2001 From: Jose Nino Date: Thu, 5 Nov 2020 13:28:27 -0800 Subject: [PATCH 02/10] last template Signed-off-by: Jose Nino --- .../stat_sinks/metrics_service/config.cc | 2 +- .../grpc_metrics_service_impl.cc | 100 ---------------- .../grpc_metrics_service_impl.h | 111 ++++++++++++++++-- .../grpc_metrics_service_impl_test.cc | 8 +- 4 files changed, 103 insertions(+), 118 deletions(-) diff --git a/source/extensions/stat_sinks/metrics_service/config.cc b/source/extensions/stat_sinks/metrics_service/config.cc index 60dd7f0d15c5b..5e56713150bd5 100644 --- a/source/extensions/stat_sinks/metrics_service/config.cc +++ b/source/extensions/stat_sinks/metrics_service/config.cc @@ -35,7 +35,7 @@ MetricsServiceSinkFactory::createStatsSink(const Protobuf::Message& config, grpc_service, server.scope(), false), server.localInfo(), transport_api_version); - return std::make_unique( + return std::make_unique>( grpc_metrics_streamer, PROTOBUF_GET_WRAPPED_OR_DEFAULT(sink_config, report_counters_as_deltas, false)); } diff --git a/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.cc b/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.cc index e581045970d3a..08936d5a0b613 100644 --- a/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.cc +++ b/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.cc @@ -45,106 +45,6 @@ void GrpcMetricsStreamerImpl::send( } } -MetricsServiceSink::MetricsServiceSink( - const GrpcMetricsStreamerSharedPtr& - grpc_metrics_streamer, - const bool report_counters_as_deltas) - : grpc_metrics_streamer_(grpc_metrics_streamer), - report_counters_as_deltas_(report_counters_as_deltas) {} - -void MetricsServiceSink::flushCounter( - const Stats::MetricSnapshot::CounterSnapshot& counter_snapshot, int64_t snapshot_time_ms) { - io::prometheus::client::MetricFamily* metrics_family = metrics_.Add(); - metrics_family->set_type(io::prometheus::client::MetricType::COUNTER); - metrics_family->set_name(counter_snapshot.counter_.get().name()); - auto* metric = metrics_family->add_metric(); - metric->set_timestamp_ms(snapshot_time_ms); - auto* counter_metric = metric->mutable_counter(); - if (report_counters_as_deltas_) { - counter_metric->set_value(counter_snapshot.delta_); - } else { - counter_metric->set_value(counter_snapshot.counter_.get().value()); - } -} - -void MetricsServiceSink::flushGauge(const Stats::Gauge& gauge, int64_t snapshot_time_ms) { - io::prometheus::client::MetricFamily* metrics_family = metrics_.Add(); - metrics_family->set_type(io::prometheus::client::MetricType::GAUGE); - metrics_family->set_name(gauge.name()); - auto* metric = metrics_family->add_metric(); - metric->set_timestamp_ms(snapshot_time_ms); - auto* gauge_metric = metric->mutable_gauge(); - gauge_metric->set_value(gauge.value()); -} - -void MetricsServiceSink::flushHistogram(const Stats::ParentHistogram& envoy_histogram, - int64_t snapshot_time_ms) { - // TODO(ramaraochavali): Currently we are sending both quantile information and bucket - // information. We should make this configurable if it turns out that sending both affects - // performance. - - // Add summary information for histograms. - io::prometheus::client::MetricFamily* summary_metrics_family = metrics_.Add(); - summary_metrics_family->set_type(io::prometheus::client::MetricType::SUMMARY); - summary_metrics_family->set_name(envoy_histogram.name()); - auto* summary_metric = summary_metrics_family->add_metric(); - summary_metric->set_timestamp_ms(snapshot_time_ms); - auto* summary = summary_metric->mutable_summary(); - const Stats::HistogramStatistics& hist_stats = envoy_histogram.intervalStatistics(); - for (size_t i = 0; i < hist_stats.supportedQuantiles().size(); i++) { - auto* quantile = summary->add_quantile(); - quantile->set_quantile(hist_stats.supportedQuantiles()[i]); - quantile->set_value(hist_stats.computedQuantiles()[i]); - } - - // Add bucket information for histograms. - io::prometheus::client::MetricFamily* histogram_metrics_family = metrics_.Add(); - histogram_metrics_family->set_type(io::prometheus::client::MetricType::HISTOGRAM); - histogram_metrics_family->set_name(envoy_histogram.name()); - auto* histogram_metric = histogram_metrics_family->add_metric(); - histogram_metric->set_timestamp_ms(snapshot_time_ms); - auto* histogram = histogram_metric->mutable_histogram(); - histogram->set_sample_count(hist_stats.sampleCount()); - histogram->set_sample_sum(hist_stats.sampleSum()); - for (size_t i = 0; i < hist_stats.supportedBuckets().size(); i++) { - auto* bucket = histogram->add_bucket(); - bucket->set_upper_bound(hist_stats.supportedBuckets()[i]); - bucket->set_cumulative_count(hist_stats.computedBuckets()[i]); - } -} - -void MetricsServiceSink::flush(Stats::MetricSnapshot& snapshot) { - metrics_.Clear(); - - // TODO(mrice32): there's probably some more sophisticated preallocation we can do here where we - // actually preallocate the submessages and then pass ownership to the proto (rather than just - // preallocating the pointer array). - metrics_.Reserve(snapshot.counters().size() + snapshot.gauges().size() + - snapshot.histograms().size()); - int64_t snapshot_time_ms = std::chrono::duration_cast( - snapshot.snapshotTime().time_since_epoch()) - .count(); - for (const auto& counter : snapshot.counters()) { - if (counter.counter_.get().used()) { - flushCounter(counter, snapshot_time_ms); - } - } - - for (const auto& gauge : snapshot.gauges()) { - if (gauge.get().used()) { - flushGauge(gauge.get(), snapshot_time_ms); - } - } - - for (const auto& histogram : snapshot.histograms()) { - if (histogram.get().used()) { - flushHistogram(histogram.get(), snapshot_time_ms); - } - } - - grpc_metrics_streamer_->send(metrics_); -} - } // namespace MetricsService } // namespace StatSinks } // namespace Extensions diff --git a/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.h b/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.h index ab0a30e1ad3c7..95abd89603a9c 100644 --- a/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.h +++ b/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.h @@ -77,25 +77,110 @@ class GrpcMetricsStreamerImpl using GrpcMetricsStreamerImplPtr = std::unique_ptr; /** - * Stat Sink implementation of Metrics Service. + * Stat Sink that flushes metrics via a gRPC service. */ -class MetricsServiceSink : public Stats::Sink { +template class MetricsServiceSink : public Stats::Sink { public: // MetricsService::Sink - MetricsServiceSink(const GrpcMetricsStreamerSharedPtr< - envoy::service::metrics::v3::StreamMetricsResponse>& grpc_metrics_streamer, - const bool report_counters_as_deltas); - void flush(Stats::MetricSnapshot& snapshot) override; + MetricsServiceSink(const GrpcMetricsStreamerSharedPtr& grpc_metrics_streamer, + const bool report_counters_as_deltas) + : grpc_metrics_streamer_(grpc_metrics_streamer), + report_counters_as_deltas_(report_counters_as_deltas) {} + void flush(Stats::MetricSnapshot& snapshot) override { + metrics_.Clear(); + + // TODO(mrice32): there's probably some more sophisticated preallocation we can do here where we + // actually preallocate the submessages and then pass ownership to the proto (rather than just + // preallocating the pointer array). + metrics_.Reserve(snapshot.counters().size() + snapshot.gauges().size() + + snapshot.histograms().size()); + int64_t snapshot_time_ms = std::chrono::duration_cast( + snapshot.snapshotTime().time_since_epoch()) + .count(); + for (const auto& counter : snapshot.counters()) { + if (counter.counter_.get().used()) { + flushCounter(counter, snapshot_time_ms); + } + } + + for (const auto& gauge : snapshot.gauges()) { + if (gauge.get().used()) { + flushGauge(gauge.get(), snapshot_time_ms); + } + } + + for (const auto& histogram : snapshot.histograms()) { + if (histogram.get().used()) { + flushHistogram(histogram.get(), snapshot_time_ms); + } + } + + grpc_metrics_streamer_->send(metrics_); + } void onHistogramComplete(const Stats::Histogram&, uint64_t) override {} - void flushCounter(const Stats::MetricSnapshot::CounterSnapshot& counter_snapshot, - int64_t snapshot_time_ms); - void flushGauge(const Stats::Gauge& gauge, int64_t snapshot_time_ms); - void flushHistogram(const Stats::ParentHistogram& envoy_histogram, int64_t snapshot_time_ms); - private: - GrpcMetricsStreamerSharedPtr - grpc_metrics_streamer_; + void flushCounter(const Stats::MetricSnapshot::CounterSnapshot& counter_snapshot, + int64_t snapshot_time_ms) { + io::prometheus::client::MetricFamily* metrics_family = metrics_.Add(); + metrics_family->set_type(io::prometheus::client::MetricType::COUNTER); + metrics_family->set_name(counter_snapshot.counter_.get().name()); + auto* metric = metrics_family->add_metric(); + metric->set_timestamp_ms(snapshot_time_ms); + auto* counter_metric = metric->mutable_counter(); + if (report_counters_as_deltas_) { + counter_metric->set_value(counter_snapshot.delta_); + } else { + counter_metric->set_value(counter_snapshot.counter_.get().value()); + } + } + + void flushGauge(const Stats::Gauge& gauge, int64_t snapshot_time_ms) { + io::prometheus::client::MetricFamily* metrics_family = metrics_.Add(); + metrics_family->set_type(io::prometheus::client::MetricType::GAUGE); + metrics_family->set_name(gauge.name()); + auto* metric = metrics_family->add_metric(); + metric->set_timestamp_ms(snapshot_time_ms); + auto* gauge_metric = metric->mutable_gauge(); + gauge_metric->set_value(gauge.value()); + } + + void flushHistogram(const Stats::ParentHistogram& envoy_histogram, int64_t snapshot_time_ms) { + // TODO(ramaraochavali): Currently we are sending both quantile information and bucket + // information. We should make this configurable if it turns out that sending both affects + // performance. + + // Add summary information for histograms. + io::prometheus::client::MetricFamily* summary_metrics_family = metrics_.Add(); + summary_metrics_family->set_type(io::prometheus::client::MetricType::SUMMARY); + summary_metrics_family->set_name(envoy_histogram.name()); + auto* summary_metric = summary_metrics_family->add_metric(); + summary_metric->set_timestamp_ms(snapshot_time_ms); + auto* summary = summary_metric->mutable_summary(); + const Stats::HistogramStatistics& hist_stats = envoy_histogram.intervalStatistics(); + for (size_t i = 0; i < hist_stats.supportedQuantiles().size(); i++) { + auto* quantile = summary->add_quantile(); + quantile->set_quantile(hist_stats.supportedQuantiles()[i]); + quantile->set_value(hist_stats.computedQuantiles()[i]); + } + + // Add bucket information for histograms. + io::prometheus::client::MetricFamily* histogram_metrics_family = metrics_.Add(); + histogram_metrics_family->set_type(io::prometheus::client::MetricType::HISTOGRAM); + histogram_metrics_family->set_name(envoy_histogram.name()); + auto* histogram_metric = histogram_metrics_family->add_metric(); + histogram_metric->set_timestamp_ms(snapshot_time_ms); + auto* histogram = histogram_metric->mutable_histogram(); + histogram->set_sample_count(hist_stats.sampleCount()); + histogram->set_sample_sum(hist_stats.sampleSum()); + for (size_t i = 0; i < hist_stats.supportedBuckets().size(); i++) { + auto* bucket = histogram->add_bucket(); + bucket->set_upper_bound(hist_stats.supportedBuckets()[i]); + bucket->set_cumulative_count(hist_stats.computedBuckets()[i]); + } + } + + GrpcMetricsStreamerSharedPtr grpc_metrics_streamer_; Envoy::Protobuf::RepeatedPtrField metrics_; const bool report_counters_as_deltas_; }; diff --git a/test/extensions/stats_sinks/metrics_service/grpc_metrics_service_impl_test.cc b/test/extensions/stats_sinks/metrics_service/grpc_metrics_service_impl_test.cc index 61d5c30b6f9b5..c90a6f6048f37 100644 --- a/test/extensions/stats_sinks/metrics_service/grpc_metrics_service_impl_test.cc +++ b/test/extensions/stats_sinks/metrics_service/grpc_metrics_service_impl_test.cc @@ -102,7 +102,7 @@ class MetricsServiceSinkTest : public testing::Test { }; TEST_F(MetricsServiceSinkTest, CheckSendCall) { - MetricsServiceSink sink(streamer_, false); + MetricsServiceSink sink(streamer_, false); auto counter = std::make_shared>(); counter->name_ = "test_counter"; @@ -126,7 +126,7 @@ TEST_F(MetricsServiceSinkTest, CheckSendCall) { } TEST_F(MetricsServiceSinkTest, CheckStatsCount) { - MetricsServiceSink sink(streamer_, false); + MetricsServiceSink sink(streamer_, false); auto counter = std::make_shared>(); counter->name_ = "test_counter"; @@ -159,7 +159,7 @@ TEST_F(MetricsServiceSinkTest, CheckStatsCount) { // Test that verifies counters are correctly reported as current value when configured to do so. TEST_F(MetricsServiceSinkTest, ReportCountersValues) { - MetricsServiceSink sink(streamer_, false); + MetricsServiceSink sink(streamer_, false); auto counter = std::make_shared>(); counter->name_ = "test_counter"; @@ -178,7 +178,7 @@ TEST_F(MetricsServiceSinkTest, ReportCountersValues) { // Test that verifies counters are reported as the delta between flushes when configured to do so. TEST_F(MetricsServiceSinkTest, ReportCountersAsDeltas) { - MetricsServiceSink sink(streamer_, true); + MetricsServiceSink sink(streamer_, true); auto counter = std::make_shared>(); counter->name_ = "test_counter"; From 3d1c1dac18bc9d80a695849158e43180692fcd43 Mon Sep 17 00:00:00 2001 From: Jose Nino Date: Mon, 9 Nov 2020 22:07:47 -0800 Subject: [PATCH 03/10] comments Signed-off-by: Jose Nino --- .../stat_sinks/metrics_service/config.cc | 6 +- .../grpc_metrics_service_impl.cc | 106 +++++++++++- .../grpc_metrics_service_impl.h | 155 ++++++------------ .../grpc_metrics_service_impl_test.cc | 76 +++++---- 4 files changed, 192 insertions(+), 151 deletions(-) diff --git a/source/extensions/stat_sinks/metrics_service/config.cc b/source/extensions/stat_sinks/metrics_service/config.cc index 5e56713150bd5..8b44af894b8b9 100644 --- a/source/extensions/stat_sinks/metrics_service/config.cc +++ b/source/extensions/stat_sinks/metrics_service/config.cc @@ -29,13 +29,15 @@ MetricsServiceSinkFactory::createStatsSink(const Protobuf::Message& config, const auto& transport_api_version = sink_config.transport_api_version(); ENVOY_LOG(debug, "Metrics Service gRPC service configuration: {}", grpc_service.DebugString()); - std::shared_ptr> + std::shared_ptr> grpc_metrics_streamer = std::make_shared( server.clusterManager().grpcAsyncClientManager().factoryForGrpcService( grpc_service, server.scope(), false), server.localInfo(), transport_api_version); - return std::make_unique>( + return std::make_unique>( grpc_metrics_streamer, PROTOBUF_GET_WRAPPED_OR_DEFAULT(sink_config, report_counters_as_deltas, false)); } diff --git a/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.cc b/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.cc index 08936d5a0b613..7c52aebc01367 100644 --- a/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.cc +++ b/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.cc @@ -21,22 +21,23 @@ namespace MetricsService { GrpcMetricsStreamerImpl::GrpcMetricsStreamerImpl( Grpc::AsyncClientFactoryPtr&& factory, const LocalInfo::LocalInfo& local_info, envoy::config::core::v3::ApiVersion transport_api_version) - : client_(factory->create()), local_info_(local_info), + : GrpcMetricsStreamer(std::move(factory)), + local_info_(local_info), service_method_( Grpc::VersionedMethods("envoy.service.metrics.v3.MetricsService.StreamMetrics", "envoy.service.metrics.v2.MetricsService.StreamMetrics") .getMethodDescriptorForVersion(transport_api_version)), transport_api_version_(transport_api_version) {} -void GrpcMetricsStreamerImpl::send( - Envoy::Protobuf::RepeatedPtrField& metrics) { +void GrpcMetricsStreamerImpl::send(MetricsPtr&& metrics) { envoy::service::metrics::v3::StreamMetricsMessage message; - message.mutable_envoy_metrics()->Reserve(metrics.size()); - message.mutable_envoy_metrics()->MergeFrom(metrics); + message.mutable_envoy_metrics()->Reserve(metrics->size()); + message.mutable_envoy_metrics()->MergeFrom(*metrics); if (stream_ == nullptr) { stream_ = client_->start(service_method_, *this, Http::AsyncClient::StreamOptions()); - // for perf reasons, the identifier is only sent on establishing the stream. + // For perf reasons, the identifier is only sent on establishing the stream. auto* identifier = message.mutable_identifier(); *identifier->mutable_node() = local_info_.node(); } @@ -45,6 +46,99 @@ void GrpcMetricsStreamerImpl::send( } } +MetricsPtr MetricsFlusher::flush(Stats::MetricSnapshot& snapshot) const { + auto metrics = + std::make_unique>(); + + // TODO(mrice32): there's probably some more sophisticated preallocation we can do here where we + // actually preallocate the submessages and then pass ownership to the proto (rather than just + // preallocating the pointer array). + metrics->Reserve(snapshot.counters().size() + snapshot.gauges().size() + + snapshot.histograms().size()); + int64_t snapshot_time_ms = std::chrono::duration_cast( + snapshot.snapshotTime().time_since_epoch()) + .count(); + for (const auto& counter : snapshot.counters()) { + if (counter.counter_.get().used()) { + flushCounter(metrics->Add(), counter, snapshot_time_ms); + } + } + + for (const auto& gauge : snapshot.gauges()) { + if (gauge.get().used()) { + flushGauge(metrics->Add(), gauge.get(), snapshot_time_ms); + } + } + + for (const auto& histogram : snapshot.histograms()) { + if (histogram.get().used()) { + flushHistogram(metrics->Add(), metrics->Add(), histogram.get(), snapshot_time_ms); + } + } + + return metrics; +} + +void MetricsFlusher::flushCounter(io::prometheus::client::MetricFamily* metrics_family, + const Stats::MetricSnapshot::CounterSnapshot& counter_snapshot, + int64_t snapshot_time_ms) const { + metrics_family->set_type(io::prometheus::client::MetricType::COUNTER); + metrics_family->set_name(counter_snapshot.counter_.get().name()); + auto* metric = metrics_family->add_metric(); + metric->set_timestamp_ms(snapshot_time_ms); + auto* counter_metric = metric->mutable_counter(); + if (report_counters_as_deltas_) { + counter_metric->set_value(counter_snapshot.delta_); + } else { + counter_metric->set_value(counter_snapshot.counter_.get().value()); + } +} + +void MetricsFlusher::flushGauge(io::prometheus::client::MetricFamily* metrics_family, + const Stats::Gauge& gauge, int64_t snapshot_time_ms) const { + metrics_family->set_type(io::prometheus::client::MetricType::GAUGE); + metrics_family->set_name(gauge.name()); + auto* metric = metrics_family->add_metric(); + metric->set_timestamp_ms(snapshot_time_ms); + auto* gauge_metric = metric->mutable_gauge(); + gauge_metric->set_value(gauge.value()); +} + +void MetricsFlusher::flushHistogram(io::prometheus::client::MetricFamily* summary_metrics_family, + io::prometheus::client::MetricFamily* histogram_metrics_family, + const Stats::ParentHistogram& envoy_histogram, + int64_t snapshot_time_ms) const { + // TODO(ramaraochavali): Currently we are sending both quantile information and bucket + // information. We should make this configurable if it turns out that sending both affects + // performance. + + // Add summary information for histograms. + summary_metrics_family->set_type(io::prometheus::client::MetricType::SUMMARY); + summary_metrics_family->set_name(envoy_histogram.name()); + auto* summary_metric = summary_metrics_family->add_metric(); + summary_metric->set_timestamp_ms(snapshot_time_ms); + auto* summary = summary_metric->mutable_summary(); + const Stats::HistogramStatistics& hist_stats = envoy_histogram.intervalStatistics(); + for (size_t i = 0; i < hist_stats.supportedQuantiles().size(); i++) { + auto* quantile = summary->add_quantile(); + quantile->set_quantile(hist_stats.supportedQuantiles()[i]); + quantile->set_value(hist_stats.computedQuantiles()[i]); + } + + // Add bucket information for histograms. + histogram_metrics_family->set_type(io::prometheus::client::MetricType::HISTOGRAM); + histogram_metrics_family->set_name(envoy_histogram.name()); + auto* histogram_metric = histogram_metrics_family->add_metric(); + histogram_metric->set_timestamp_ms(snapshot_time_ms); + auto* histogram = histogram_metric->mutable_histogram(); + histogram->set_sample_count(hist_stats.sampleCount()); + histogram->set_sample_sum(hist_stats.sampleSum()); + for (size_t i = 0; i < hist_stats.supportedBuckets().size(); i++) { + auto* bucket = histogram->add_bucket(); + bucket->set_upper_bound(hist_stats.supportedBuckets()[i]); + bucket->set_cumulative_count(hist_stats.computedBuckets()[i]); + } +} } // namespace MetricsService } // namespace StatSinks } // namespace Extensions diff --git a/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.h b/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.h index 95abd89603a9c..6a59ef5f3484f 100644 --- a/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.h +++ b/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.h @@ -20,20 +20,23 @@ namespace Extensions { namespace StatSinks { namespace MetricsService { +using MetricsPtr = + std::unique_ptr>; + /** * Interface for metrics streamer. */ -template +template class GrpcMetricsStreamer : public Grpc::AsyncStreamCallbacks { public: + GrpcMetricsStreamer(Grpc::AsyncClientFactoryPtr&& factory) : client_(factory->create()) {} ~GrpcMetricsStreamer() override = default; /** * Send Metrics Message. * @param message supplies the metrics to send. */ - virtual void - send(Envoy::Protobuf::RepeatedPtrField& metrics) PURE; + virtual void send(MetricsPtr&& metrics) PURE; // Grpc::AsyncStreamCallbacks void onCreateInitialMetadata(Http::RequestHeaderMap&) override {} @@ -41,34 +44,35 @@ class GrpcMetricsStreamer : public Grpc::AsyncStreamCallbacks { void onReceiveMessage(std::unique_ptr&&) override {} void onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&&) override {} void onRemoteClose(Grpc::Status::GrpcStatus, const std::string&) override{}; + +protected: + Grpc::AsyncStream stream_{}; + Grpc::AsyncClient client_; }; -template -using GrpcMetricsStreamerSharedPtr = std::shared_ptr>; +template +using GrpcMetricsStreamerSharedPtr = + std::shared_ptr>; /** * Production implementation of GrpcMetricsStreamer */ class GrpcMetricsStreamerImpl : public Singleton::Instance, - public GrpcMetricsStreamer { + public GrpcMetricsStreamer { public: GrpcMetricsStreamerImpl(Grpc::AsyncClientFactoryPtr&& factory, const LocalInfo::LocalInfo& local_info, envoy::config::core::v3::ApiVersion transport_api_version); // GrpcMetricsStreamer - void - send(Envoy::Protobuf::RepeatedPtrField& metrics) override; + void send(MetricsPtr&& metrics) override; // Grpc::AsyncStreamCallbacks void onRemoteClose(Grpc::Status::GrpcStatus, const std::string&) override { stream_ = nullptr; } private: - Grpc::AsyncStream stream_{}; - Grpc::AsyncClient - client_; const LocalInfo::LocalInfo& local_info_; const Protobuf::MethodDescriptor& service_method_; const envoy::config::core::v3::ApiVersion transport_api_version_; @@ -76,113 +80,46 @@ class GrpcMetricsStreamerImpl using GrpcMetricsStreamerImplPtr = std::unique_ptr; +class MetricsFlusher { +public: + MetricsFlusher(const bool report_counters_as_deltas) + : report_counters_as_deltas_(report_counters_as_deltas) {} + + MetricsPtr flush(Stats::MetricSnapshot& snapshot) const; + +private: + void flushCounter(io::prometheus::client::MetricFamily* metrics_family, + const Stats::MetricSnapshot::CounterSnapshot& counter_snapshot, + int64_t snapshot_time_ms) const; + void flushGauge(io::prometheus::client::MetricFamily* metrics_family, const Stats::Gauge& gauge, + int64_t snapshot_time_ms) const; + void flushHistogram(io::prometheus::client::MetricFamily* summary_metrics_family, + io::prometheus::client::MetricFamily* histogram_metrics_family, + const Stats::ParentHistogram& envoy_histogram, + int64_t snapshot_time_ms) const; + + const bool report_counters_as_deltas_; +}; + /** * Stat Sink that flushes metrics via a gRPC service. */ -template class MetricsServiceSink : public Stats::Sink { +template class MetricsServiceSink : public Stats::Sink { public: // MetricsService::Sink - MetricsServiceSink(const GrpcMetricsStreamerSharedPtr& grpc_metrics_streamer, - const bool report_counters_as_deltas) - : grpc_metrics_streamer_(grpc_metrics_streamer), - report_counters_as_deltas_(report_counters_as_deltas) {} + MetricsServiceSink( + const GrpcMetricsStreamerSharedPtr& grpc_metrics_streamer, + const bool report_counters_as_deltas) + : flusher_(report_counters_as_deltas), grpc_metrics_streamer_(grpc_metrics_streamer) {} + void flush(Stats::MetricSnapshot& snapshot) override { - metrics_.Clear(); - - // TODO(mrice32): there's probably some more sophisticated preallocation we can do here where we - // actually preallocate the submessages and then pass ownership to the proto (rather than just - // preallocating the pointer array). - metrics_.Reserve(snapshot.counters().size() + snapshot.gauges().size() + - snapshot.histograms().size()); - int64_t snapshot_time_ms = std::chrono::duration_cast( - snapshot.snapshotTime().time_since_epoch()) - .count(); - for (const auto& counter : snapshot.counters()) { - if (counter.counter_.get().used()) { - flushCounter(counter, snapshot_time_ms); - } - } - - for (const auto& gauge : snapshot.gauges()) { - if (gauge.get().used()) { - flushGauge(gauge.get(), snapshot_time_ms); - } - } - - for (const auto& histogram : snapshot.histograms()) { - if (histogram.get().used()) { - flushHistogram(histogram.get(), snapshot_time_ms); - } - } - - grpc_metrics_streamer_->send(metrics_); + grpc_metrics_streamer_->send(std::move(flusher_.flush(snapshot))); } void onHistogramComplete(const Stats::Histogram&, uint64_t) override {} private: - void flushCounter(const Stats::MetricSnapshot::CounterSnapshot& counter_snapshot, - int64_t snapshot_time_ms) { - io::prometheus::client::MetricFamily* metrics_family = metrics_.Add(); - metrics_family->set_type(io::prometheus::client::MetricType::COUNTER); - metrics_family->set_name(counter_snapshot.counter_.get().name()); - auto* metric = metrics_family->add_metric(); - metric->set_timestamp_ms(snapshot_time_ms); - auto* counter_metric = metric->mutable_counter(); - if (report_counters_as_deltas_) { - counter_metric->set_value(counter_snapshot.delta_); - } else { - counter_metric->set_value(counter_snapshot.counter_.get().value()); - } - } - - void flushGauge(const Stats::Gauge& gauge, int64_t snapshot_time_ms) { - io::prometheus::client::MetricFamily* metrics_family = metrics_.Add(); - metrics_family->set_type(io::prometheus::client::MetricType::GAUGE); - metrics_family->set_name(gauge.name()); - auto* metric = metrics_family->add_metric(); - metric->set_timestamp_ms(snapshot_time_ms); - auto* gauge_metric = metric->mutable_gauge(); - gauge_metric->set_value(gauge.value()); - } - - void flushHistogram(const Stats::ParentHistogram& envoy_histogram, int64_t snapshot_time_ms) { - // TODO(ramaraochavali): Currently we are sending both quantile information and bucket - // information. We should make this configurable if it turns out that sending both affects - // performance. - - // Add summary information for histograms. - io::prometheus::client::MetricFamily* summary_metrics_family = metrics_.Add(); - summary_metrics_family->set_type(io::prometheus::client::MetricType::SUMMARY); - summary_metrics_family->set_name(envoy_histogram.name()); - auto* summary_metric = summary_metrics_family->add_metric(); - summary_metric->set_timestamp_ms(snapshot_time_ms); - auto* summary = summary_metric->mutable_summary(); - const Stats::HistogramStatistics& hist_stats = envoy_histogram.intervalStatistics(); - for (size_t i = 0; i < hist_stats.supportedQuantiles().size(); i++) { - auto* quantile = summary->add_quantile(); - quantile->set_quantile(hist_stats.supportedQuantiles()[i]); - quantile->set_value(hist_stats.computedQuantiles()[i]); - } - - // Add bucket information for histograms. - io::prometheus::client::MetricFamily* histogram_metrics_family = metrics_.Add(); - histogram_metrics_family->set_type(io::prometheus::client::MetricType::HISTOGRAM); - histogram_metrics_family->set_name(envoy_histogram.name()); - auto* histogram_metric = histogram_metrics_family->add_metric(); - histogram_metric->set_timestamp_ms(snapshot_time_ms); - auto* histogram = histogram_metric->mutable_histogram(); - histogram->set_sample_count(hist_stats.sampleCount()); - histogram->set_sample_sum(hist_stats.sampleSum()); - for (size_t i = 0; i < hist_stats.supportedBuckets().size(); i++) { - auto* bucket = histogram->add_bucket(); - bucket->set_upper_bound(hist_stats.supportedBuckets()[i]); - bucket->set_cumulative_count(hist_stats.computedBuckets()[i]); - } - } - - GrpcMetricsStreamerSharedPtr grpc_metrics_streamer_; - Envoy::Protobuf::RepeatedPtrField metrics_; - const bool report_counters_as_deltas_; + const MetricsFlusher flusher_; + GrpcMetricsStreamerSharedPtr grpc_metrics_streamer_; }; } // namespace MetricsService diff --git a/test/extensions/stats_sinks/metrics_service/grpc_metrics_service_impl_test.cc b/test/extensions/stats_sinks/metrics_service/grpc_metrics_service_impl_test.cc index c90a6f6048f37..ebcbec76c1fd7 100644 --- a/test/extensions/stats_sinks/metrics_service/grpc_metrics_service_impl_test.cc +++ b/test/extensions/stats_sinks/metrics_service/grpc_metrics_service_impl_test.cc @@ -62,8 +62,9 @@ TEST_F(GrpcMetricsStreamerImplTest, BasicFlow) { expectStreamStart(stream1, &callbacks1); EXPECT_CALL(local_info_, node()); EXPECT_CALL(stream1, sendMessageRaw_(_, false)); - Envoy::Protobuf::RepeatedPtrField metrics1; - streamer_->send(metrics1); + auto metrics = + std::make_unique>(); + streamer_->send(std::move(metrics)); // Verify that sending an empty response message doesn't do anything bad. callbacks1->onReceiveMessage( std::make_unique()); @@ -81,16 +82,22 @@ TEST_F(GrpcMetricsStreamerImplTest, StreamFailure) { return nullptr; })); EXPECT_CALL(local_info_, node()); - Envoy::Protobuf::RepeatedPtrField metrics1; - streamer_->send(metrics1); + auto metrics = + std::make_unique>(); + streamer_->send(std::move(metrics)); } class MockGrpcMetricsStreamer - : public GrpcMetricsStreamer { + : public GrpcMetricsStreamer { public: + MockGrpcMetricsStreamer(Grpc::AsyncClientFactoryPtr&& factory) + : GrpcMetricsStreamer( + std::move(factory)) {} + // GrpcMetricsStreamer - MOCK_METHOD(void, send, - (Envoy::Protobuf::RepeatedPtrField & metrics)); + MOCK_METHOD(void, send, (MetricsPtr && metrics)); }; class MetricsServiceSinkTest : public testing::Test { @@ -98,11 +105,14 @@ class MetricsServiceSinkTest : public testing::Test { MetricsServiceSinkTest() = default; NiceMock snapshot_; - std::shared_ptr streamer_{new MockGrpcMetricsStreamer()}; + std::shared_ptr streamer_{new MockGrpcMetricsStreamer( + Grpc::AsyncClientFactoryPtr{new NiceMock()})}; }; TEST_F(MetricsServiceSinkTest, CheckSendCall) { - MetricsServiceSink sink(streamer_, false); + MetricsServiceSink + sink(streamer_, false); auto counter = std::make_shared>(); counter->name_ = "test_counter"; @@ -126,7 +136,9 @@ TEST_F(MetricsServiceSinkTest, CheckSendCall) { } TEST_F(MetricsServiceSinkTest, CheckStatsCount) { - MetricsServiceSink sink(streamer_, false); + MetricsServiceSink + sink(streamer_, false); auto counter = std::make_shared>(); counter->name_ = "test_counter"; @@ -140,26 +152,24 @@ TEST_F(MetricsServiceSinkTest, CheckStatsCount) { gauge->used_ = true; snapshot_.gauges_.push_back(*gauge); - EXPECT_CALL(*streamer_, send(_)) - .WillOnce(Invoke( - [](Envoy::Protobuf::RepeatedPtrField& metrics) { - EXPECT_EQ(2, metrics.size()); - })); + EXPECT_CALL(*streamer_, send(_)).WillOnce(Invoke([](MetricsPtr&& metrics) { + EXPECT_EQ(2, metrics->size()); + })); sink.flush(snapshot_); // Verify only newly added metrics come after endFlush call. gauge->used_ = false; - EXPECT_CALL(*streamer_, send(_)) - .WillOnce(Invoke( - [](Envoy::Protobuf::RepeatedPtrField& metrics) { - EXPECT_EQ(1, metrics.size()); - })); + EXPECT_CALL(*streamer_, send(_)).WillOnce(Invoke([](MetricsPtr&& metrics) { + EXPECT_EQ(1, metrics->size()); + })); sink.flush(snapshot_); } // Test that verifies counters are correctly reported as current value when configured to do so. TEST_F(MetricsServiceSinkTest, ReportCountersValues) { - MetricsServiceSink sink(streamer_, false); + MetricsServiceSink + sink(streamer_, false); auto counter = std::make_shared>(); counter->name_ = "test_counter"; @@ -167,18 +177,18 @@ TEST_F(MetricsServiceSinkTest, ReportCountersValues) { counter->used_ = true; snapshot_.counters_.push_back({1, *counter}); - EXPECT_CALL(*streamer_, send(_)) - .WillOnce(Invoke( - [](Envoy::Protobuf::RepeatedPtrField& metrics) { - EXPECT_EQ(1, metrics.size()); - EXPECT_EQ(100, metrics[0].metric(0).counter().value()); - })); + EXPECT_CALL(*streamer_, send(_)).WillOnce(Invoke([](MetricsPtr&& metrics) { + EXPECT_EQ(1, metrics->size()); + EXPECT_EQ(100, (*metrics)[0].metric(0).counter().value()); + })); sink.flush(snapshot_); } // Test that verifies counters are reported as the delta between flushes when configured to do so. TEST_F(MetricsServiceSinkTest, ReportCountersAsDeltas) { - MetricsServiceSink sink(streamer_, true); + MetricsServiceSink + sink(streamer_, true); auto counter = std::make_shared>(); counter->name_ = "test_counter"; @@ -186,12 +196,10 @@ TEST_F(MetricsServiceSinkTest, ReportCountersAsDeltas) { counter->used_ = true; snapshot_.counters_.push_back({1, *counter}); - EXPECT_CALL(*streamer_, send(_)) - .WillOnce(Invoke( - [](Envoy::Protobuf::RepeatedPtrField& metrics) { - EXPECT_EQ(1, metrics.size()); - EXPECT_EQ(1, metrics[0].metric(0).counter().value()); - })); + EXPECT_CALL(*streamer_, send(_)).WillOnce(Invoke([](MetricsPtr&& metrics) { + EXPECT_EQ(1, metrics->size()); + EXPECT_EQ(1, (*metrics)[0].metric(0).counter().value()); + })); sink.flush(snapshot_); } From c56c82dee79234f13ddff6a0f0eecd27c0a6d189 Mon Sep 17 00:00:00 2001 From: Jose Nino Date: Tue, 10 Nov 2020 14:07:35 -0800 Subject: [PATCH 04/10] comments Signed-off-by: Jose Nino --- .../grpc_metrics_service_impl.cc | 40 +++++++++---------- .../grpc_metrics_service_impl.h | 14 +++---- .../grpc_metrics_service_impl_test.cc | 3 +- 3 files changed, 28 insertions(+), 29 deletions(-) diff --git a/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.cc b/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.cc index 7c52aebc01367..11734fe336fe2 100644 --- a/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.cc +++ b/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.cc @@ -22,7 +22,7 @@ GrpcMetricsStreamerImpl::GrpcMetricsStreamerImpl( Grpc::AsyncClientFactoryPtr&& factory, const LocalInfo::LocalInfo& local_info, envoy::config::core::v3::ApiVersion transport_api_version) : GrpcMetricsStreamer(std::move(factory)), + envoy::service::metrics::v3::StreamMetricsResponse>(*factory), local_info_(local_info), service_method_( Grpc::VersionedMethods("envoy.service.metrics.v3.MetricsService.StreamMetrics", @@ -60,31 +60,31 @@ MetricsPtr MetricsFlusher::flush(Stats::MetricSnapshot& snapshot) const { .count(); for (const auto& counter : snapshot.counters()) { if (counter.counter_.get().used()) { - flushCounter(metrics->Add(), counter, snapshot_time_ms); + flushCounter(*metrics->Add(), counter, snapshot_time_ms); } } for (const auto& gauge : snapshot.gauges()) { if (gauge.get().used()) { - flushGauge(metrics->Add(), gauge.get(), snapshot_time_ms); + flushGauge(*metrics->Add(), gauge.get(), snapshot_time_ms); } } for (const auto& histogram : snapshot.histograms()) { if (histogram.get().used()) { - flushHistogram(metrics->Add(), metrics->Add(), histogram.get(), snapshot_time_ms); + flushHistogram(*metrics->Add(), *metrics->Add(), histogram.get(), snapshot_time_ms); } } return metrics; } -void MetricsFlusher::flushCounter(io::prometheus::client::MetricFamily* metrics_family, +void MetricsFlusher::flushCounter(io::prometheus::client::MetricFamily& metrics_family, const Stats::MetricSnapshot::CounterSnapshot& counter_snapshot, int64_t snapshot_time_ms) const { - metrics_family->set_type(io::prometheus::client::MetricType::COUNTER); - metrics_family->set_name(counter_snapshot.counter_.get().name()); - auto* metric = metrics_family->add_metric(); + metrics_family.set_type(io::prometheus::client::MetricType::COUNTER); + metrics_family.set_name(counter_snapshot.counter_.get().name()); + auto* metric = metrics_family.add_metric(); metric->set_timestamp_ms(snapshot_time_ms); auto* counter_metric = metric->mutable_counter(); if (report_counters_as_deltas_) { @@ -94,18 +94,18 @@ void MetricsFlusher::flushCounter(io::prometheus::client::MetricFamily* metrics_ } } -void MetricsFlusher::flushGauge(io::prometheus::client::MetricFamily* metrics_family, +void MetricsFlusher::flushGauge(io::prometheus::client::MetricFamily& metrics_family, const Stats::Gauge& gauge, int64_t snapshot_time_ms) const { - metrics_family->set_type(io::prometheus::client::MetricType::GAUGE); - metrics_family->set_name(gauge.name()); - auto* metric = metrics_family->add_metric(); + metrics_family.set_type(io::prometheus::client::MetricType::GAUGE); + metrics_family.set_name(gauge.name()); + auto* metric = metrics_family.add_metric(); metric->set_timestamp_ms(snapshot_time_ms); auto* gauge_metric = metric->mutable_gauge(); gauge_metric->set_value(gauge.value()); } -void MetricsFlusher::flushHistogram(io::prometheus::client::MetricFamily* summary_metrics_family, - io::prometheus::client::MetricFamily* histogram_metrics_family, +void MetricsFlusher::flushHistogram(io::prometheus::client::MetricFamily& summary_metrics_family, + io::prometheus::client::MetricFamily& histogram_metrics_family, const Stats::ParentHistogram& envoy_histogram, int64_t snapshot_time_ms) const { // TODO(ramaraochavali): Currently we are sending both quantile information and bucket @@ -113,9 +113,9 @@ void MetricsFlusher::flushHistogram(io::prometheus::client::MetricFamily* summar // performance. // Add summary information for histograms. - summary_metrics_family->set_type(io::prometheus::client::MetricType::SUMMARY); - summary_metrics_family->set_name(envoy_histogram.name()); - auto* summary_metric = summary_metrics_family->add_metric(); + summary_metrics_family.set_type(io::prometheus::client::MetricType::SUMMARY); + summary_metrics_family.set_name(envoy_histogram.name()); + auto* summary_metric = summary_metrics_family.add_metric(); summary_metric->set_timestamp_ms(snapshot_time_ms); auto* summary = summary_metric->mutable_summary(); const Stats::HistogramStatistics& hist_stats = envoy_histogram.intervalStatistics(); @@ -126,9 +126,9 @@ void MetricsFlusher::flushHistogram(io::prometheus::client::MetricFamily* summar } // Add bucket information for histograms. - histogram_metrics_family->set_type(io::prometheus::client::MetricType::HISTOGRAM); - histogram_metrics_family->set_name(envoy_histogram.name()); - auto* histogram_metric = histogram_metrics_family->add_metric(); + histogram_metrics_family.set_type(io::prometheus::client::MetricType::HISTOGRAM); + histogram_metrics_family.set_name(envoy_histogram.name()); + auto* histogram_metric = histogram_metrics_family.add_metric(); histogram_metric->set_timestamp_ms(snapshot_time_ms); auto* histogram = histogram_metric->mutable_histogram(); histogram->set_sample_count(hist_stats.sampleCount()); diff --git a/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.h b/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.h index 6a59ef5f3484f..782d887caf7f7 100644 --- a/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.h +++ b/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.h @@ -29,7 +29,7 @@ using MetricsPtr = template class GrpcMetricsStreamer : public Grpc::AsyncStreamCallbacks { public: - GrpcMetricsStreamer(Grpc::AsyncClientFactoryPtr&& factory) : client_(factory->create()) {} + explicit GrpcMetricsStreamer(Grpc::AsyncClientFactory& factory) : client_(factory.create()) {} ~GrpcMetricsStreamer() override = default; /** @@ -82,19 +82,19 @@ using GrpcMetricsStreamerImplPtr = std::unique_ptr; class MetricsFlusher { public: - MetricsFlusher(const bool report_counters_as_deltas) + explicit MetricsFlusher(const bool report_counters_as_deltas) : report_counters_as_deltas_(report_counters_as_deltas) {} MetricsPtr flush(Stats::MetricSnapshot& snapshot) const; private: - void flushCounter(io::prometheus::client::MetricFamily* metrics_family, + void flushCounter(io::prometheus::client::MetricFamily& metrics_family, const Stats::MetricSnapshot::CounterSnapshot& counter_snapshot, int64_t snapshot_time_ms) const; - void flushGauge(io::prometheus::client::MetricFamily* metrics_family, const Stats::Gauge& gauge, + void flushGauge(io::prometheus::client::MetricFamily& metrics_family, const Stats::Gauge& gauge, int64_t snapshot_time_ms) const; - void flushHistogram(io::prometheus::client::MetricFamily* summary_metrics_family, - io::prometheus::client::MetricFamily* histogram_metrics_family, + void flushHistogram(io::prometheus::client::MetricFamily& summary_metrics_family, + io::prometheus::client::MetricFamily& histogram_metrics_family, const Stats::ParentHistogram& envoy_histogram, int64_t snapshot_time_ms) const; @@ -113,7 +113,7 @@ template class MetricsServiceSink : pu : flusher_(report_counters_as_deltas), grpc_metrics_streamer_(grpc_metrics_streamer) {} void flush(Stats::MetricSnapshot& snapshot) override { - grpc_metrics_streamer_->send(std::move(flusher_.flush(snapshot))); + grpc_metrics_streamer_->send(flusher_.flush(snapshot)); } void onHistogramComplete(const Stats::Histogram&, uint64_t) override {} diff --git a/test/extensions/stats_sinks/metrics_service/grpc_metrics_service_impl_test.cc b/test/extensions/stats_sinks/metrics_service/grpc_metrics_service_impl_test.cc index ebcbec76c1fd7..bafd9bc579606 100644 --- a/test/extensions/stats_sinks/metrics_service/grpc_metrics_service_impl_test.cc +++ b/test/extensions/stats_sinks/metrics_service/grpc_metrics_service_impl_test.cc @@ -93,8 +93,7 @@ class MockGrpcMetricsStreamer public: MockGrpcMetricsStreamer(Grpc::AsyncClientFactoryPtr&& factory) : GrpcMetricsStreamer( - std::move(factory)) {} + envoy::service::metrics::v3::StreamMetricsResponse>(*factory) {} // GrpcMetricsStreamer MOCK_METHOD(void, send, (MetricsPtr && metrics)); From 531a38e48c71ba71f702f465d2d9ccff031e7260 Mon Sep 17 00:00:00 2001 From: Jose Nino Date: Mon, 16 Nov 2020 11:30:53 -0800 Subject: [PATCH 05/10] missing check Signed-off-by: Jose Nino --- .../metrics_service/metrics_service_integration_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/extensions/stats_sinks/metrics_service/metrics_service_integration_test.cc b/test/extensions/stats_sinks/metrics_service/metrics_service_integration_test.cc index ff46886b4cd7a..69e583ac2813a 100644 --- a/test/extensions/stats_sinks/metrics_service/metrics_service_integration_test.cc +++ b/test/extensions/stats_sinks/metrics_service/metrics_service_integration_test.cc @@ -75,7 +75,7 @@ class MetricsServiceIntegrationTest : public Grpc::VersionedGrpcClientIntegratio // required stats are flushed. // TODO(ramaraochavali): Figure out a more robust way to find out all required stats have been // flushed. - while (!(known_counter_exists && known_gauge_exists && known_histogram_exists)) { + while (!(known_counter_exists && known_gauge_exists && known_summary_exists && known_histogram_exists)) { envoy::service::metrics::v3::StreamMetricsMessage request_msg; VERIFY_ASSERTION(metrics_service_request_->waitForGrpcMessage(*dispatcher_, request_msg)); EXPECT_EQ("POST", metrics_service_request_->headers().getMethodValue()); From 70314fafdbaffd9bd0c5e93bc01845aedb8f0966 Mon Sep 17 00:00:00 2001 From: Jose Nino Date: Mon, 16 Nov 2020 13:58:00 -0800 Subject: [PATCH 06/10] fmt Signed-off-by: Jose Nino --- .../metrics_service/metrics_service_integration_test.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/extensions/stats_sinks/metrics_service/metrics_service_integration_test.cc b/test/extensions/stats_sinks/metrics_service/metrics_service_integration_test.cc index 69e583ac2813a..7ce25c95db85a 100644 --- a/test/extensions/stats_sinks/metrics_service/metrics_service_integration_test.cc +++ b/test/extensions/stats_sinks/metrics_service/metrics_service_integration_test.cc @@ -75,7 +75,8 @@ class MetricsServiceIntegrationTest : public Grpc::VersionedGrpcClientIntegratio // required stats are flushed. // TODO(ramaraochavali): Figure out a more robust way to find out all required stats have been // flushed. - while (!(known_counter_exists && known_gauge_exists && known_summary_exists && known_histogram_exists)) { + while (!(known_counter_exists && known_gauge_exists && known_summary_exists && + known_histogram_exists)) { envoy::service::metrics::v3::StreamMetricsMessage request_msg; VERIFY_ASSERTION(metrics_service_request_->waitForGrpcMessage(*dispatcher_, request_msg)); EXPECT_EQ("POST", metrics_service_request_->headers().getMethodValue()); From 93135335dfc065afaf37c7a6da77ba8dacaeceac Mon Sep 17 00:00:00 2001 From: Jose Nino Date: Mon, 7 Dec 2020 12:17:54 -0800 Subject: [PATCH 07/10] test Signed-off-by: Jose Nino --- .../metrics_service/metrics_service_integration_test.cc | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/test/extensions/stats_sinks/metrics_service/metrics_service_integration_test.cc b/test/extensions/stats_sinks/metrics_service/metrics_service_integration_test.cc index 7216186f9caaf..be5caee7770d7 100644 --- a/test/extensions/stats_sinks/metrics_service/metrics_service_integration_test.cc +++ b/test/extensions/stats_sinks/metrics_service/metrics_service_integration_test.cc @@ -75,8 +75,7 @@ class MetricsServiceIntegrationTest : public Grpc::VersionedGrpcClientIntegratio // required stats are flushed. // TODO(ramaraochavali): Figure out a more robust way to find out all required stats have been // flushed. - while (!(known_counter_exists && known_gauge_exists && known_summary_exists && - known_histogram_exists)) { + while (!(known_counter_exists && known_gauge_exists && known_histogram_exists)) { envoy::service::metrics::v3::StreamMetricsMessage request_msg; VERIFY_ASSERTION(metrics_service_request_->waitForGrpcMessage(*dispatcher_, request_msg)); EXPECT_EQ("POST", metrics_service_request_->headers().getMethodValue()); @@ -126,7 +125,6 @@ class MetricsServiceIntegrationTest : public Grpc::VersionedGrpcClientIntegratio } EXPECT_TRUE(known_counter_exists); EXPECT_TRUE(known_gauge_exists); - EXPECT_TRUE(known_summary_exists); EXPECT_TRUE(known_histogram_exists); return AssertionSuccess(); From ddcb222b920e18de65f0f711190c0ce06332f536 Mon Sep 17 00:00:00 2001 From: Jose Nino Date: Mon, 7 Dec 2020 14:31:45 -0800 Subject: [PATCH 08/10] fix Signed-off-by: Jose Nino --- .../metrics_service/metrics_service_integration_test.cc | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/test/extensions/stats_sinks/metrics_service/metrics_service_integration_test.cc b/test/extensions/stats_sinks/metrics_service/metrics_service_integration_test.cc index be5caee7770d7..a00d6d0a697fe 100644 --- a/test/extensions/stats_sinks/metrics_service/metrics_service_integration_test.cc +++ b/test/extensions/stats_sinks/metrics_service/metrics_service_integration_test.cc @@ -66,7 +66,6 @@ class MetricsServiceIntegrationTest : public Grpc::VersionedGrpcClientIntegratio ABSL_MUST_USE_RESULT AssertionResult waitForMetricsRequest() { - bool known_summary_exists = false; bool known_histogram_exists = false; bool known_counter_exists = false; bool known_gauge_exists = false; @@ -101,8 +100,7 @@ class MetricsServiceIntegrationTest : public Grpc::VersionedGrpcClientIntegratio } if (metrics_family.name() == "cluster.cluster_0.upstream_rq_time" && metrics_family.type() == ::io::prometheus::client::MetricType::SUMMARY) { - known_summary_exists = true; - Stats::HistogramStatisticsImpl empty_statistics; + Stats::HistogramStatisticsImpl empty_statistics; EXPECT_EQ(metrics_family.metric(0).summary().quantile_size(), empty_statistics.supportedQuantiles().size()); } From 6660a2d1658e10f73b3e4c2e9e7d22376c4e234d Mon Sep 17 00:00:00 2001 From: Jose Nino Date: Mon, 7 Dec 2020 20:02:31 -0800 Subject: [PATCH 09/10] fmt Signed-off-by: Jose Nino --- .../metrics_service/metrics_service_integration_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/extensions/stats_sinks/metrics_service/metrics_service_integration_test.cc b/test/extensions/stats_sinks/metrics_service/metrics_service_integration_test.cc index a00d6d0a697fe..683544fb2d23b 100644 --- a/test/extensions/stats_sinks/metrics_service/metrics_service_integration_test.cc +++ b/test/extensions/stats_sinks/metrics_service/metrics_service_integration_test.cc @@ -100,7 +100,7 @@ class MetricsServiceIntegrationTest : public Grpc::VersionedGrpcClientIntegratio } if (metrics_family.name() == "cluster.cluster_0.upstream_rq_time" && metrics_family.type() == ::io::prometheus::client::MetricType::SUMMARY) { - Stats::HistogramStatisticsImpl empty_statistics; + Stats::HistogramStatisticsImpl empty_statistics; EXPECT_EQ(metrics_family.metric(0).summary().quantile_size(), empty_statistics.supportedQuantiles().size()); } From b61ae9328c7ad2f1b8f5114a037720b7950f8269 Mon Sep 17 00:00:00 2001 From: Jose Nino Date: Tue, 8 Dec 2020 12:22:34 -0800 Subject: [PATCH 10/10] add back Signed-off-by: Jose Nino --- .../metrics_service/metrics_service_integration_test.cc | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/test/extensions/stats_sinks/metrics_service/metrics_service_integration_test.cc b/test/extensions/stats_sinks/metrics_service/metrics_service_integration_test.cc index 86a4e1e9a79fe..81b89d4ff9bf1 100644 --- a/test/extensions/stats_sinks/metrics_service/metrics_service_integration_test.cc +++ b/test/extensions/stats_sinks/metrics_service/metrics_service_integration_test.cc @@ -66,6 +66,7 @@ class MetricsServiceIntegrationTest : public Grpc::VersionedGrpcClientIntegratio ABSL_MUST_USE_RESULT AssertionResult waitForMetricsRequest() { + bool known_summary_exists = false; bool known_histogram_exists = false; bool known_counter_exists = false; bool known_gauge_exists = false; @@ -74,7 +75,8 @@ class MetricsServiceIntegrationTest : public Grpc::VersionedGrpcClientIntegratio // required stats are flushed. // TODO(ramaraochavali): Figure out a more robust way to find out all required stats have been // flushed. - while (!(known_counter_exists && known_gauge_exists && known_histogram_exists)) { + while (!(known_counter_exists && known_gauge_exists && known_summary_exists && + known_histogram_exists)) { envoy::service::metrics::v3::StreamMetricsMessage request_msg; VERIFY_ASSERTION(metrics_service_request_->waitForGrpcMessage(*dispatcher_, request_msg)); EXPECT_EQ("POST", metrics_service_request_->headers().getMethodValue()); @@ -100,6 +102,7 @@ class MetricsServiceIntegrationTest : public Grpc::VersionedGrpcClientIntegratio } if (metrics_family.name() == "cluster.cluster_0.upstream_rq_time" && metrics_family.type() == ::io::prometheus::client::MetricType::SUMMARY) { + known_summary_exists = true; Stats::HistogramStatisticsImpl empty_statistics; EXPECT_EQ(metrics_family.metric(0).summary().quantile_size(), empty_statistics.supportedQuantiles().size()); @@ -116,13 +119,15 @@ class MetricsServiceIntegrationTest : public Grpc::VersionedGrpcClientIntegratio EXPECT_EQ(previous_time_stamp, metrics_family.metric(0).timestamp_ms()); } previous_time_stamp = metrics_family.metric(0).timestamp_ms(); - if (known_counter_exists && known_gauge_exists && known_histogram_exists) { + if (known_counter_exists && known_gauge_exists && known_summary_exists && + known_histogram_exists) { break; } } } EXPECT_TRUE(known_counter_exists); EXPECT_TRUE(known_gauge_exists); + EXPECT_TRUE(known_summary_exists); EXPECT_TRUE(known_histogram_exists); return AssertionSuccess();