diff --git a/source/extensions/stat_sinks/metrics_service/config.cc b/source/extensions/stat_sinks/metrics_service/config.cc index 05228e9a67c40..8b44af894b8b9 100644 --- a/source/extensions/stat_sinks/metrics_service/config.cc +++ b/source/extensions/stat_sinks/metrics_service/config.cc @@ -29,13 +29,15 @@ MetricsServiceSinkFactory::createStatsSink(const Protobuf::Message& config, const auto& transport_api_version = sink_config.transport_api_version(); ENVOY_LOG(debug, "Metrics Service gRPC service configuration: {}", grpc_service.DebugString()); - std::shared_ptr grpc_metrics_streamer = - std::make_shared( + std::shared_ptr> + grpc_metrics_streamer = std::make_shared( server.clusterManager().grpcAsyncClientManager().factoryForGrpcService( grpc_service, server.scope(), false), server.localInfo(), transport_api_version); - return std::make_unique( + return std::make_unique>( grpc_metrics_streamer, PROTOBUF_GET_WRAPPED_OR_DEFAULT(sink_config, report_counters_as_deltas, false)); } diff --git a/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.cc b/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.cc index d18bfbf20c838..11734fe336fe2 100644 --- a/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.cc +++ b/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.cc @@ -21,16 +21,23 @@ namespace MetricsService { GrpcMetricsStreamerImpl::GrpcMetricsStreamerImpl( Grpc::AsyncClientFactoryPtr&& factory, const LocalInfo::LocalInfo& local_info, envoy::config::core::v3::ApiVersion transport_api_version) - : client_(factory->create()), local_info_(local_info), + : GrpcMetricsStreamer(*factory), + local_info_(local_info), service_method_( Grpc::VersionedMethods("envoy.service.metrics.v3.MetricsService.StreamMetrics", "envoy.service.metrics.v2.MetricsService.StreamMetrics") .getMethodDescriptorForVersion(transport_api_version)), transport_api_version_(transport_api_version) {} -void GrpcMetricsStreamerImpl::send(envoy::service::metrics::v3::StreamMetricsMessage& message) { +void GrpcMetricsStreamerImpl::send(MetricsPtr&& metrics) { + envoy::service::metrics::v3::StreamMetricsMessage message; + message.mutable_envoy_metrics()->Reserve(metrics->size()); + message.mutable_envoy_metrics()->MergeFrom(*metrics); + if (stream_ == nullptr) { stream_ = client_->start(service_method_, *this, Http::AsyncClient::StreamOptions()); + // For perf reasons, the identifier is only sent on establishing the stream. auto* identifier = message.mutable_identifier(); *identifier->mutable_node() = local_info_.node(); } @@ -39,17 +46,45 @@ void GrpcMetricsStreamerImpl::send(envoy::service::metrics::v3::StreamMetricsMes } } -MetricsServiceSink::MetricsServiceSink(const GrpcMetricsStreamerSharedPtr& grpc_metrics_streamer, - const bool report_counters_as_deltas) - : grpc_metrics_streamer_(grpc_metrics_streamer), - report_counters_as_deltas_(report_counters_as_deltas) {} - -void MetricsServiceSink::flushCounter( - const Stats::MetricSnapshot::CounterSnapshot& counter_snapshot, int64_t snapshot_time_ms) { - io::prometheus::client::MetricFamily* metrics_family = message_.add_envoy_metrics(); - metrics_family->set_type(io::prometheus::client::MetricType::COUNTER); - metrics_family->set_name(counter_snapshot.counter_.get().name()); - auto* metric = metrics_family->add_metric(); +MetricsPtr MetricsFlusher::flush(Stats::MetricSnapshot& snapshot) const { + auto metrics = + std::make_unique>(); + + // TODO(mrice32): there's probably some more sophisticated preallocation we can do here where we + // actually preallocate the submessages and then pass ownership to the proto (rather than just + // preallocating the pointer array). + metrics->Reserve(snapshot.counters().size() + snapshot.gauges().size() + + snapshot.histograms().size()); + int64_t snapshot_time_ms = std::chrono::duration_cast( + snapshot.snapshotTime().time_since_epoch()) + .count(); + for (const auto& counter : snapshot.counters()) { + if (counter.counter_.get().used()) { + flushCounter(*metrics->Add(), counter, snapshot_time_ms); + } + } + + for (const auto& gauge : snapshot.gauges()) { + if (gauge.get().used()) { + flushGauge(*metrics->Add(), gauge.get(), snapshot_time_ms); + } + } + + for (const auto& histogram : snapshot.histograms()) { + if (histogram.get().used()) { + flushHistogram(*metrics->Add(), *metrics->Add(), histogram.get(), snapshot_time_ms); + } + } + + return metrics; +} + +void MetricsFlusher::flushCounter(io::prometheus::client::MetricFamily& metrics_family, + const Stats::MetricSnapshot::CounterSnapshot& counter_snapshot, + int64_t snapshot_time_ms) const { + metrics_family.set_type(io::prometheus::client::MetricType::COUNTER); + metrics_family.set_name(counter_snapshot.counter_.get().name()); + auto* metric = metrics_family.add_metric(); metric->set_timestamp_ms(snapshot_time_ms); auto* counter_metric = metric->mutable_counter(); if (report_counters_as_deltas_) { @@ -59,27 +94,28 @@ void MetricsServiceSink::flushCounter( } } -void MetricsServiceSink::flushGauge(const Stats::Gauge& gauge, int64_t snapshot_time_ms) { - io::prometheus::client::MetricFamily* metrics_family = message_.add_envoy_metrics(); - metrics_family->set_type(io::prometheus::client::MetricType::GAUGE); - metrics_family->set_name(gauge.name()); - auto* metric = metrics_family->add_metric(); +void MetricsFlusher::flushGauge(io::prometheus::client::MetricFamily& metrics_family, + const Stats::Gauge& gauge, int64_t snapshot_time_ms) const { + metrics_family.set_type(io::prometheus::client::MetricType::GAUGE); + metrics_family.set_name(gauge.name()); + auto* metric = metrics_family.add_metric(); metric->set_timestamp_ms(snapshot_time_ms); auto* gauge_metric = metric->mutable_gauge(); gauge_metric->set_value(gauge.value()); } -void MetricsServiceSink::flushHistogram(const Stats::ParentHistogram& envoy_histogram, - int64_t snapshot_time_ms) { +void MetricsFlusher::flushHistogram(io::prometheus::client::MetricFamily& summary_metrics_family, + io::prometheus::client::MetricFamily& histogram_metrics_family, + const Stats::ParentHistogram& envoy_histogram, + int64_t snapshot_time_ms) const { // TODO(ramaraochavali): Currently we are sending both quantile information and bucket // information. We should make this configurable if it turns out that sending both affects // performance. // Add summary information for histograms. - io::prometheus::client::MetricFamily* summary_metrics_family = message_.add_envoy_metrics(); - summary_metrics_family->set_type(io::prometheus::client::MetricType::SUMMARY); - summary_metrics_family->set_name(envoy_histogram.name()); - auto* summary_metric = summary_metrics_family->add_metric(); + summary_metrics_family.set_type(io::prometheus::client::MetricType::SUMMARY); + summary_metrics_family.set_name(envoy_histogram.name()); + auto* summary_metric = summary_metrics_family.add_metric(); summary_metric->set_timestamp_ms(snapshot_time_ms); auto* summary = summary_metric->mutable_summary(); const Stats::HistogramStatistics& hist_stats = envoy_histogram.intervalStatistics(); @@ -90,10 +126,9 @@ void MetricsServiceSink::flushHistogram(const Stats::ParentHistogram& envoy_hist } // Add bucket information for histograms. - io::prometheus::client::MetricFamily* histogram_metrics_family = message_.add_envoy_metrics(); - histogram_metrics_family->set_type(io::prometheus::client::MetricType::HISTOGRAM); - histogram_metrics_family->set_name(envoy_histogram.name()); - auto* histogram_metric = histogram_metrics_family->add_metric(); + histogram_metrics_family.set_type(io::prometheus::client::MetricType::HISTOGRAM); + histogram_metrics_family.set_name(envoy_histogram.name()); + auto* histogram_metric = histogram_metrics_family.add_metric(); histogram_metric->set_timestamp_ms(snapshot_time_ms); auto* histogram = histogram_metric->mutable_histogram(); histogram->set_sample_count(hist_stats.sampleCount()); @@ -104,43 +139,6 @@ void MetricsServiceSink::flushHistogram(const Stats::ParentHistogram& envoy_hist bucket->set_cumulative_count(hist_stats.computedBuckets()[i]); } } - -void MetricsServiceSink::flush(Stats::MetricSnapshot& snapshot) { - message_.clear_envoy_metrics(); - - // TODO(mrice32): there's probably some more sophisticated preallocation we can do here where we - // actually preallocate the submessages and then pass ownership to the proto (rather than just - // preallocating the pointer array). - message_.mutable_envoy_metrics()->Reserve(snapshot.counters().size() + snapshot.gauges().size() + - snapshot.histograms().size()); - int64_t snapshot_time_ms = std::chrono::duration_cast( - snapshot.snapshotTime().time_since_epoch()) - .count(); - for (const auto& counter : snapshot.counters()) { - if (counter.counter_.get().used()) { - flushCounter(counter, snapshot_time_ms); - } - } - - for (const auto& gauge : snapshot.gauges()) { - if (gauge.get().used()) { - flushGauge(gauge.get(), snapshot_time_ms); - } - } - - for (const auto& histogram : snapshot.histograms()) { - if (histogram.get().used()) { - flushHistogram(histogram.get(), snapshot_time_ms); - } - } - - grpc_metrics_streamer_->send(message_); - // for perf reasons, clear the identifier after the first flush. - if (message_.has_identifier()) { - message_.clear_identifier(); - } -} - } // namespace MetricsService } // namespace StatSinks } // namespace Extensions diff --git a/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.h b/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.h index 668c7f3fb5676..782d887caf7f7 100644 --- a/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.h +++ b/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.h @@ -20,52 +20,59 @@ namespace Extensions { namespace StatSinks { namespace MetricsService { +using MetricsPtr = + std::unique_ptr>; + /** * Interface for metrics streamer. */ -class GrpcMetricsStreamer - : public Grpc::AsyncStreamCallbacks { +template +class GrpcMetricsStreamer : public Grpc::AsyncStreamCallbacks { public: + explicit GrpcMetricsStreamer(Grpc::AsyncClientFactory& factory) : client_(factory.create()) {} ~GrpcMetricsStreamer() override = default; /** * Send Metrics Message. * @param message supplies the metrics to send. */ - virtual void send(envoy::service::metrics::v3::StreamMetricsMessage& message) PURE; + virtual void send(MetricsPtr&& metrics) PURE; // Grpc::AsyncStreamCallbacks void onCreateInitialMetadata(Http::RequestHeaderMap&) override {} void onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&&) override {} - void - onReceiveMessage(std::unique_ptr&&) override { - } + void onReceiveMessage(std::unique_ptr&&) override {} void onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&&) override {} void onRemoteClose(Grpc::Status::GrpcStatus, const std::string&) override{}; + +protected: + Grpc::AsyncStream stream_{}; + Grpc::AsyncClient client_; }; -using GrpcMetricsStreamerSharedPtr = std::shared_ptr; +template +using GrpcMetricsStreamerSharedPtr = + std::shared_ptr>; /** * Production implementation of GrpcMetricsStreamer */ -class GrpcMetricsStreamerImpl : public Singleton::Instance, public GrpcMetricsStreamer { +class GrpcMetricsStreamerImpl + : public Singleton::Instance, + public GrpcMetricsStreamer { public: GrpcMetricsStreamerImpl(Grpc::AsyncClientFactoryPtr&& factory, const LocalInfo::LocalInfo& local_info, envoy::config::core::v3::ApiVersion transport_api_version); // GrpcMetricsStreamer - void send(envoy::service::metrics::v3::StreamMetricsMessage& message) override; + void send(MetricsPtr&& metrics) override; // Grpc::AsyncStreamCallbacks void onRemoteClose(Grpc::Status::GrpcStatus, const std::string&) override { stream_ = nullptr; } private: - Grpc::AsyncStream stream_{}; - Grpc::AsyncClient - client_; const LocalInfo::LocalInfo& local_info_; const Protobuf::MethodDescriptor& service_method_; const envoy::config::core::v3::ApiVersion transport_api_version_; @@ -73,26 +80,46 @@ class GrpcMetricsStreamerImpl : public Singleton::Instance, public GrpcMetricsSt using GrpcMetricsStreamerImplPtr = std::unique_ptr; +class MetricsFlusher { +public: + explicit MetricsFlusher(const bool report_counters_as_deltas) + : report_counters_as_deltas_(report_counters_as_deltas) {} + + MetricsPtr flush(Stats::MetricSnapshot& snapshot) const; + +private: + void flushCounter(io::prometheus::client::MetricFamily& metrics_family, + const Stats::MetricSnapshot::CounterSnapshot& counter_snapshot, + int64_t snapshot_time_ms) const; + void flushGauge(io::prometheus::client::MetricFamily& metrics_family, const Stats::Gauge& gauge, + int64_t snapshot_time_ms) const; + void flushHistogram(io::prometheus::client::MetricFamily& summary_metrics_family, + io::prometheus::client::MetricFamily& histogram_metrics_family, + const Stats::ParentHistogram& envoy_histogram, + int64_t snapshot_time_ms) const; + + const bool report_counters_as_deltas_; +}; + /** - * Stat Sink implementation of Metrics Service. + * Stat Sink that flushes metrics via a gRPC service. */ -class MetricsServiceSink : public Stats::Sink { +template class MetricsServiceSink : public Stats::Sink { public: // MetricsService::Sink - MetricsServiceSink(const GrpcMetricsStreamerSharedPtr& grpc_metrics_streamer, - const bool report_counters_as_deltas); - void flush(Stats::MetricSnapshot& snapshot) override; - void onHistogramComplete(const Stats::Histogram&, uint64_t) override {} + MetricsServiceSink( + const GrpcMetricsStreamerSharedPtr& grpc_metrics_streamer, + const bool report_counters_as_deltas) + : flusher_(report_counters_as_deltas), grpc_metrics_streamer_(grpc_metrics_streamer) {} - void flushCounter(const Stats::MetricSnapshot::CounterSnapshot& counter_snapshot, - int64_t snapshot_time_ms); - void flushGauge(const Stats::Gauge& gauge, int64_t snapshot_time_ms); - void flushHistogram(const Stats::ParentHistogram& envoy_histogram, int64_t snapshot_time_ms); + void flush(Stats::MetricSnapshot& snapshot) override { + grpc_metrics_streamer_->send(flusher_.flush(snapshot)); + } + void onHistogramComplete(const Stats::Histogram&, uint64_t) override {} private: - GrpcMetricsStreamerSharedPtr grpc_metrics_streamer_; - envoy::service::metrics::v3::StreamMetricsMessage message_; - const bool report_counters_as_deltas_; + const MetricsFlusher flusher_; + GrpcMetricsStreamerSharedPtr grpc_metrics_streamer_; }; } // namespace MetricsService diff --git a/test/extensions/stats_sinks/metrics_service/grpc_metrics_service_impl_test.cc b/test/extensions/stats_sinks/metrics_service/grpc_metrics_service_impl_test.cc index 927b710ac7562..bafd9bc579606 100644 --- a/test/extensions/stats_sinks/metrics_service/grpc_metrics_service_impl_test.cc +++ b/test/extensions/stats_sinks/metrics_service/grpc_metrics_service_impl_test.cc @@ -62,8 +62,9 @@ TEST_F(GrpcMetricsStreamerImplTest, BasicFlow) { expectStreamStart(stream1, &callbacks1); EXPECT_CALL(local_info_, node()); EXPECT_CALL(stream1, sendMessageRaw_(_, false)); - envoy::service::metrics::v3::StreamMetricsMessage message_metrics1; - streamer_->send(message_metrics1); + auto metrics = + std::make_unique>(); + streamer_->send(std::move(metrics)); // Verify that sending an empty response message doesn't do anything bad. callbacks1->onReceiveMessage( std::make_unique()); @@ -81,14 +82,21 @@ TEST_F(GrpcMetricsStreamerImplTest, StreamFailure) { return nullptr; })); EXPECT_CALL(local_info_, node()); - envoy::service::metrics::v3::StreamMetricsMessage message_metrics1; - streamer_->send(message_metrics1); + auto metrics = + std::make_unique>(); + streamer_->send(std::move(metrics)); } -class MockGrpcMetricsStreamer : public GrpcMetricsStreamer { +class MockGrpcMetricsStreamer + : public GrpcMetricsStreamer { public: + MockGrpcMetricsStreamer(Grpc::AsyncClientFactoryPtr&& factory) + : GrpcMetricsStreamer(*factory) {} + // GrpcMetricsStreamer - MOCK_METHOD(void, send, (envoy::service::metrics::v3::StreamMetricsMessage & message)); + MOCK_METHOD(void, send, (MetricsPtr && metrics)); }; class MetricsServiceSinkTest : public testing::Test { @@ -96,11 +104,14 @@ class MetricsServiceSinkTest : public testing::Test { MetricsServiceSinkTest() = default; NiceMock snapshot_; - std::shared_ptr streamer_{new MockGrpcMetricsStreamer()}; + std::shared_ptr streamer_{new MockGrpcMetricsStreamer( + Grpc::AsyncClientFactoryPtr{new NiceMock()})}; }; TEST_F(MetricsServiceSinkTest, CheckSendCall) { - MetricsServiceSink sink(streamer_, false); + MetricsServiceSink + sink(streamer_, false); auto counter = std::make_shared>(); counter->name_ = "test_counter"; @@ -124,7 +135,9 @@ TEST_F(MetricsServiceSinkTest, CheckSendCall) { } TEST_F(MetricsServiceSinkTest, CheckStatsCount) { - MetricsServiceSink sink(streamer_, false); + MetricsServiceSink + sink(streamer_, false); auto counter = std::make_shared>(); counter->name_ = "test_counter"; @@ -138,24 +151,24 @@ TEST_F(MetricsServiceSinkTest, CheckStatsCount) { gauge->used_ = true; snapshot_.gauges_.push_back(*gauge); - EXPECT_CALL(*streamer_, send(_)) - .WillOnce(Invoke([](envoy::service::metrics::v3::StreamMetricsMessage& message) { - EXPECT_EQ(2, message.envoy_metrics_size()); - })); + EXPECT_CALL(*streamer_, send(_)).WillOnce(Invoke([](MetricsPtr&& metrics) { + EXPECT_EQ(2, metrics->size()); + })); sink.flush(snapshot_); // Verify only newly added metrics come after endFlush call. gauge->used_ = false; - EXPECT_CALL(*streamer_, send(_)) - .WillOnce(Invoke([](envoy::service::metrics::v3::StreamMetricsMessage& message) { - EXPECT_EQ(1, message.envoy_metrics_size()); - })); + EXPECT_CALL(*streamer_, send(_)).WillOnce(Invoke([](MetricsPtr&& metrics) { + EXPECT_EQ(1, metrics->size()); + })); sink.flush(snapshot_); } // Test that verifies counters are correctly reported as current value when configured to do so. TEST_F(MetricsServiceSinkTest, ReportCountersValues) { - MetricsServiceSink sink(streamer_, false); + MetricsServiceSink + sink(streamer_, false); auto counter = std::make_shared>(); counter->name_ = "test_counter"; @@ -163,17 +176,18 @@ TEST_F(MetricsServiceSinkTest, ReportCountersValues) { counter->used_ = true; snapshot_.counters_.push_back({1, *counter}); - EXPECT_CALL(*streamer_, send(_)) - .WillOnce(Invoke([](envoy::service::metrics::v3::StreamMetricsMessage& message) { - EXPECT_EQ(1, message.envoy_metrics_size()); - EXPECT_EQ(100, message.envoy_metrics(0).metric(0).counter().value()); - })); + EXPECT_CALL(*streamer_, send(_)).WillOnce(Invoke([](MetricsPtr&& metrics) { + EXPECT_EQ(1, metrics->size()); + EXPECT_EQ(100, (*metrics)[0].metric(0).counter().value()); + })); sink.flush(snapshot_); } // Test that verifies counters are reported as the delta between flushes when configured to do so. TEST_F(MetricsServiceSinkTest, ReportCountersAsDeltas) { - MetricsServiceSink sink(streamer_, true); + MetricsServiceSink + sink(streamer_, true); auto counter = std::make_shared>(); counter->name_ = "test_counter"; @@ -181,11 +195,10 @@ TEST_F(MetricsServiceSinkTest, ReportCountersAsDeltas) { counter->used_ = true; snapshot_.counters_.push_back({1, *counter}); - EXPECT_CALL(*streamer_, send(_)) - .WillOnce(Invoke([](envoy::service::metrics::v3::StreamMetricsMessage& message) { - EXPECT_EQ(1, message.envoy_metrics_size()); - EXPECT_EQ(1, message.envoy_metrics(0).metric(0).counter().value()); - })); + EXPECT_CALL(*streamer_, send(_)).WillOnce(Invoke([](MetricsPtr&& metrics) { + EXPECT_EQ(1, metrics->size()); + EXPECT_EQ(1, (*metrics)[0].metric(0).counter().value()); + })); sink.flush(snapshot_); } diff --git a/test/extensions/stats_sinks/metrics_service/metrics_service_integration_test.cc b/test/extensions/stats_sinks/metrics_service/metrics_service_integration_test.cc index 8fc20ca8390a6..81b89d4ff9bf1 100644 --- a/test/extensions/stats_sinks/metrics_service/metrics_service_integration_test.cc +++ b/test/extensions/stats_sinks/metrics_service/metrics_service_integration_test.cc @@ -75,7 +75,8 @@ class MetricsServiceIntegrationTest : public Grpc::VersionedGrpcClientIntegratio // required stats are flushed. // TODO(ramaraochavali): Figure out a more robust way to find out all required stats have been // flushed. - while (!(known_counter_exists && known_gauge_exists && known_histogram_exists)) { + while (!(known_counter_exists && known_gauge_exists && known_summary_exists && + known_histogram_exists)) { envoy::service::metrics::v3::StreamMetricsMessage request_msg; VERIFY_ASSERTION(metrics_service_request_->waitForGrpcMessage(*dispatcher_, request_msg)); EXPECT_EQ("POST", metrics_service_request_->headers().getMethodValue()); @@ -118,7 +119,8 @@ class MetricsServiceIntegrationTest : public Grpc::VersionedGrpcClientIntegratio EXPECT_EQ(previous_time_stamp, metrics_family.metric(0).timestamp_ms()); } previous_time_stamp = metrics_family.metric(0).timestamp_ms(); - if (known_counter_exists && known_gauge_exists && known_histogram_exists) { + if (known_counter_exists && known_gauge_exists && known_summary_exists && + known_histogram_exists) { break; } }