From ad23f20392f26350fec7afca229729c2ef9ef92b Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Mon, 15 Aug 2022 16:19:34 -0700 Subject: [PATCH] METRICS SDK - Calling Observable Instruments callback during metrics collection (#1554) --- .../sdk/metrics/async_instruments.h | 5 +- sdk/include/opentelemetry/sdk/metrics/meter.h | 2 + sdk/src/metrics/async_instruments.cc | 6 +- sdk/src/metrics/meter.cc | 19 +++-- sdk/test/metrics/CMakeLists.txt | 1 + sdk/test/metrics/async_instruments_test.cc | 4 +- sdk/test/metrics/meter_test.cc | 75 +++++++++++++++++++ 7 files changed, 100 insertions(+), 12 deletions(-) create mode 100644 sdk/test/metrics/meter_test.cc diff --git a/sdk/include/opentelemetry/sdk/metrics/async_instruments.h b/sdk/include/opentelemetry/sdk/metrics/async_instruments.h index f14d8f3a27..b32f68fa1f 100644 --- a/sdk/include/opentelemetry/sdk/metrics/async_instruments.h +++ b/sdk/include/opentelemetry/sdk/metrics/async_instruments.h @@ -21,7 +21,8 @@ class ObservableInstrument : public opentelemetry::metrics::ObservableInstrument { public: ObservableInstrument(InstrumentDescriptor instrument_descriptor, - std::unique_ptr storage); + std::unique_ptr storage, + std::shared_ptr observable_registry); void AddCallback(opentelemetry::metrics::ObservableCallbackPtr callback, void *state) noexcept override; @@ -36,7 +37,7 @@ class ObservableInstrument : public opentelemetry::metrics::ObservableInstrument private: InstrumentDescriptor instrument_descriptor_; std::unique_ptr storage_; - std::unique_ptr observable_registry_; + std::shared_ptr observable_registry_; }; } // namespace metrics } // namespace sdk diff --git a/sdk/include/opentelemetry/sdk/metrics/meter.h b/sdk/include/opentelemetry/sdk/metrics/meter.h index c6bce29205..ef0d5ffb4e 100644 --- a/sdk/include/opentelemetry/sdk/metrics/meter.h +++ b/sdk/include/opentelemetry/sdk/metrics/meter.h @@ -24,6 +24,7 @@ namespace metrics class MetricStorage; class SyncWritableMetricStorage; class AsyncWritableMetricsStorge; +class ObservableRegistry; class Meter final : public opentelemetry::metrics::Meter { @@ -114,6 +115,7 @@ class Meter final : public opentelemetry::metrics::Meter std::weak_ptr meter_context_; // Mapping between instrument-name and Aggregation Storage. std::unordered_map> storage_registry_; + std::shared_ptr observable_registry_; std::unique_ptr RegisterSyncMetricStorage( InstrumentDescriptor &instrument_descriptor); std::unique_ptr RegisterAsyncMetricStorage( diff --git a/sdk/src/metrics/async_instruments.cc b/sdk/src/metrics/async_instruments.cc index e41cb304a8..ee38a3536b 100644 --- a/sdk/src/metrics/async_instruments.cc +++ b/sdk/src/metrics/async_instruments.cc @@ -15,10 +15,12 @@ namespace metrics { ObservableInstrument::ObservableInstrument(InstrumentDescriptor instrument_descriptor, - std::unique_ptr storage) + std::unique_ptr storage, + std::shared_ptr observable_registry) : instrument_descriptor_(instrument_descriptor), storage_(std::move(storage)), - observable_registry_{new ObservableRegistry()} + observable_registry_{observable_registry} + {} void ObservableInstrument::AddCallback(opentelemetry::metrics::ObservableCallbackPtr callback, diff --git a/sdk/src/metrics/meter.cc b/sdk/src/metrics/meter.cc index 3f6b1aa4a1..62f6eaf313 100644 --- a/sdk/src/metrics/meter.cc +++ b/sdk/src/metrics/meter.cc @@ -8,7 +8,9 @@ # include "opentelemetry/sdk/metrics/async_instruments.h" # include "opentelemetry/sdk/metrics/exemplar/no_exemplar_reservoir.h" # include "opentelemetry/sdk/metrics/state/multi_metric_storage.h" +# include "opentelemetry/sdk/metrics/state/observable_registry.h" # include "opentelemetry/sdk/metrics/state/sync_metric_storage.h" + # include "opentelemetry/sdk/metrics/sync_instruments.h" # include "opentelemetry/sdk_config.h" @@ -26,7 +28,9 @@ namespace nostd = opentelemetry::nostd; Meter::Meter( std::weak_ptr meter_context, std::unique_ptr instrumentation_scope) noexcept - : scope_{std::move(instrumentation_scope)}, meter_context_{meter_context} + : scope_{std::move(instrumentation_scope)}, + meter_context_{meter_context}, + observable_registry_(new ObservableRegistry()) {} nostd::shared_ptr> Meter::CreateLongCounter(nostd::string_view name, @@ -66,7 +70,7 @@ nostd::shared_ptr Meter::CreateLon InstrumentValueType::kLong}; auto storage = RegisterAsyncMetricStorage(instrument_descriptor); return nostd::shared_ptr{ - new ObservableInstrument(instrument_descriptor, std::move(storage))}; + new ObservableInstrument(instrument_descriptor, std::move(storage), observable_registry_)}; } nostd::shared_ptr @@ -80,7 +84,7 @@ Meter::CreateDoubleObservableCounter(nostd::string_view name, InstrumentValueType::kDouble}; auto storage = RegisterAsyncMetricStorage(instrument_descriptor); return nostd::shared_ptr{ - new ObservableInstrument(instrument_descriptor, std::move(storage))}; + new ObservableInstrument(instrument_descriptor, std::move(storage), observable_registry_)}; } nostd::shared_ptr> Meter::CreateLongHistogram( @@ -122,7 +126,7 @@ nostd::shared_ptr Meter::CreateLon InstrumentValueType::kLong}; auto storage = RegisterAsyncMetricStorage(instrument_descriptor); return nostd::shared_ptr{ - new ObservableInstrument(instrument_descriptor, std::move(storage))}; + new ObservableInstrument(instrument_descriptor, std::move(storage), observable_registry_)}; } nostd::shared_ptr Meter::CreateDoubleObservableGauge( @@ -136,7 +140,7 @@ nostd::shared_ptr Meter::CreateDou InstrumentValueType::kDouble}; auto storage = RegisterAsyncMetricStorage(instrument_descriptor); return nostd::shared_ptr{ - new ObservableInstrument(instrument_descriptor, std::move(storage))}; + new ObservableInstrument(instrument_descriptor, std::move(storage), observable_registry_)}; } nostd::shared_ptr> Meter::CreateLongUpDownCounter( @@ -178,7 +182,7 @@ Meter::CreateLongObservableUpDownCounter(nostd::string_view name, InstrumentValueType::kLong}; auto storage = RegisterAsyncMetricStorage(instrument_descriptor); return nostd::shared_ptr{ - new ObservableInstrument(instrument_descriptor, std::move(storage))}; + new ObservableInstrument(instrument_descriptor, std::move(storage), observable_registry_)}; } nostd::shared_ptr @@ -192,7 +196,7 @@ Meter::CreateDoubleObservableUpDownCounter(nostd::string_view name, InstrumentValueType::kDouble}; auto storage = RegisterAsyncMetricStorage(instrument_descriptor); return nostd::shared_ptr{ - new ObservableInstrument(instrument_descriptor, std::move(storage))}; + new ObservableInstrument(instrument_descriptor, std::move(storage), observable_registry_)}; } const sdk::instrumentationscope::InstrumentationScope *Meter::GetInstrumentationScope() @@ -290,6 +294,7 @@ std::vector Meter::Collect(CollectorHandle *collector, opentelemetry::common::SystemTimestamp collect_ts) noexcept { + observable_registry_->Observe(collect_ts); std::vector metric_data_list; auto ctx = meter_context_.lock(); if (!ctx) diff --git a/sdk/test/metrics/CMakeLists.txt b/sdk/test/metrics/CMakeLists.txt index 7038506739..d57a8101a6 100644 --- a/sdk/test/metrics/CMakeLists.txt +++ b/sdk/test/metrics/CMakeLists.txt @@ -1,6 +1,7 @@ foreach( testname meter_provider_sdk_test + meter_test view_registry_test aggregation_test attributes_processor_test diff --git a/sdk/test/metrics/async_instruments_test.cc b/sdk/test/metrics/async_instruments_test.cc index 67f5d919cd..42340b1334 100644 --- a/sdk/test/metrics/async_instruments_test.cc +++ b/sdk/test/metrics/async_instruments_test.cc @@ -19,8 +19,10 @@ TEST(AsyncInstruments, ObservableInstrument) InstrumentDescriptor instrument_descriptor = {"long_counter", "description", "1", InstrumentType::kObservableCounter, InstrumentValueType::kLong}; + std::shared_ptr observable_registry(new ObservableRegistry()); std::unique_ptr metric_storage(new AsyncMultiMetricStorage()); - ObservableInstrument observable_counter_long(instrument_descriptor, std::move(metric_storage)); + ObservableInstrument observable_counter_long(instrument_descriptor, std::move(metric_storage), + observable_registry); observable_counter_long.AddCallback(asyc_generate_measurements, nullptr); } diff --git a/sdk/test/metrics/meter_test.cc b/sdk/test/metrics/meter_test.cc new file mode 100644 index 0000000000..7633270fbd --- /dev/null +++ b/sdk/test/metrics/meter_test.cc @@ -0,0 +1,75 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#ifndef ENABLE_METRICS_PREVIEW +# include "opentelemetry/sdk/metrics/meter.h" +# include "opentelemetry/sdk/metrics/data/point_data.h" +# include "opentelemetry/sdk/metrics/meter_context.h" +# include "opentelemetry/sdk/metrics/meter_provider.h" +# include "opentelemetry/sdk/metrics/metric_reader.h" + +# include + +using namespace opentelemetry; +using namespace opentelemetry::sdk::instrumentationscope; +using namespace opentelemetry::sdk::metrics; + +class MockMetricReader : public MetricReader +{ +public: + MockMetricReader() = default; + AggregationTemporality GetAggregationTemporality( + InstrumentType instrument_type) const noexcept override + { + return AggregationTemporality::kCumulative; + } + bool OnShutDown(std::chrono::microseconds timeout) noexcept override { return true; } + bool OnForceFlush(std::chrono::microseconds timeout) noexcept override { return true; } + void OnInitialized() noexcept override {} +}; + +namespace +{ +nostd::shared_ptr InitMeter(MetricReader **metricReaderPtr) +{ + static std::shared_ptr provider(new MeterProvider()); + std::unique_ptr metric_reader(new MockMetricReader()); + *metricReaderPtr = metric_reader.get(); + auto p = std::static_pointer_cast(provider); + p->AddMetricReader(std::move(metric_reader)); + auto meter = provider->GetMeter("meter_name"); + return meter; +} +} // namespace + +void asyc_generate_measurements(opentelemetry::metrics::ObserverResult observer, void *state) +{ + auto observer_long = + nostd::get>>(observer); + observer_long->Observe(10l); +} + +TEST(MeterTest, BasicAsyncTests) +{ + MetricReader *metric_reader_ptr = nullptr; + auto meter = InitMeter(&metric_reader_ptr); + auto observable_counter = meter->CreateLongObservableCounter("observable_counter"); + observable_counter->AddCallback(asyc_generate_measurements, nullptr); + + size_t count = 0; + metric_reader_ptr->Collect([&count](ResourceMetrics &metric_data) { + EXPECT_EQ(metric_data.scope_metric_data_.size(), 1); + if (metric_data.scope_metric_data_.size()) + { + EXPECT_EQ(metric_data.scope_metric_data_[0].metric_data_.size(), 1); + if (metric_data.scope_metric_data_.size()) + { + count += metric_data.scope_metric_data_[0].metric_data_.size(); + EXPECT_EQ(count, 1); + } + } + return true; + }); +} + +#endif