Skip to content

Commit

Permalink
METRICS SDK - Calling Observable Instruments callback during metrics …
Browse files Browse the repository at this point in the history
…collection (open-telemetry#1554)
  • Loading branch information
lalitb authored and yxue committed Dec 5, 2022
1 parent 2412638 commit 73c1cb3
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 12 deletions.
5 changes: 3 additions & 2 deletions sdk/include/opentelemetry/sdk/metrics/async_instruments.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ class ObservableInstrument : public opentelemetry::metrics::ObservableInstrument
{
public:
ObservableInstrument(InstrumentDescriptor instrument_descriptor,
std::unique_ptr<AsyncWritableMetricStorage> storage);
std::unique_ptr<AsyncWritableMetricStorage> storage,
std::shared_ptr<ObservableRegistry> observable_registry);

void AddCallback(opentelemetry::metrics::ObservableCallbackPtr callback,
void *state) noexcept override;
Expand All @@ -36,7 +37,7 @@ class ObservableInstrument : public opentelemetry::metrics::ObservableInstrument
private:
InstrumentDescriptor instrument_descriptor_;
std::unique_ptr<AsyncWritableMetricStorage> storage_;
std::unique_ptr<ObservableRegistry> observable_registry_;
std::shared_ptr<ObservableRegistry> observable_registry_;
};
} // namespace metrics
} // namespace sdk
Expand Down
2 changes: 2 additions & 0 deletions sdk/include/opentelemetry/sdk/metrics/meter.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ namespace metrics
class MetricStorage;
class SyncWritableMetricStorage;
class AsyncWritableMetricsStorge;
class ObservableRegistry;

class Meter final : public opentelemetry::metrics::Meter
{
Expand Down Expand Up @@ -114,6 +115,7 @@ class Meter final : public opentelemetry::metrics::Meter
std::weak_ptr<sdk::metrics::MeterContext> meter_context_;
// Mapping between instrument-name and Aggregation Storage.
std::unordered_map<std::string, std::shared_ptr<MetricStorage>> storage_registry_;
std::shared_ptr<ObservableRegistry> observable_registry_;
std::unique_ptr<SyncWritableMetricStorage> RegisterSyncMetricStorage(
InstrumentDescriptor &instrument_descriptor);
std::unique_ptr<AsyncWritableMetricStorage> RegisterAsyncMetricStorage(
Expand Down
6 changes: 4 additions & 2 deletions sdk/src/metrics/async_instruments.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ namespace metrics
{

ObservableInstrument::ObservableInstrument(InstrumentDescriptor instrument_descriptor,
std::unique_ptr<AsyncWritableMetricStorage> storage)
std::unique_ptr<AsyncWritableMetricStorage> storage,
std::shared_ptr<ObservableRegistry> observable_registry)
: instrument_descriptor_(instrument_descriptor),
storage_(std::move(storage)),
observable_registry_{new ObservableRegistry()}
observable_registry_{observable_registry}

{}

void ObservableInstrument::AddCallback(opentelemetry::metrics::ObservableCallbackPtr callback,
Expand Down
19 changes: 12 additions & 7 deletions sdk/src/metrics/meter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
# include "opentelemetry/sdk/metrics/async_instruments.h"
# include "opentelemetry/sdk/metrics/exemplar/no_exemplar_reservoir.h"
# include "opentelemetry/sdk/metrics/state/multi_metric_storage.h"
# include "opentelemetry/sdk/metrics/state/observable_registry.h"
# include "opentelemetry/sdk/metrics/state/sync_metric_storage.h"

# include "opentelemetry/sdk/metrics/sync_instruments.h"
# include "opentelemetry/sdk_config.h"

Expand All @@ -26,7 +28,9 @@ namespace nostd = opentelemetry::nostd;
Meter::Meter(
std::weak_ptr<MeterContext> meter_context,
std::unique_ptr<sdk::instrumentationscope::InstrumentationScope> instrumentation_scope) noexcept
: scope_{std::move(instrumentation_scope)}, meter_context_{meter_context}
: scope_{std::move(instrumentation_scope)},
meter_context_{meter_context},
observable_registry_(new ObservableRegistry())
{}

nostd::shared_ptr<metrics::Counter<long>> Meter::CreateLongCounter(nostd::string_view name,
Expand Down Expand Up @@ -66,7 +70,7 @@ nostd::shared_ptr<opentelemetry::metrics::ObservableInstrument> Meter::CreateLon
InstrumentValueType::kLong};
auto storage = RegisterAsyncMetricStorage(instrument_descriptor);
return nostd::shared_ptr<metrics::ObservableInstrument>{
new ObservableInstrument(instrument_descriptor, std::move(storage))};
new ObservableInstrument(instrument_descriptor, std::move(storage), observable_registry_)};
}

nostd::shared_ptr<opentelemetry::metrics::ObservableInstrument>
Expand All @@ -80,7 +84,7 @@ Meter::CreateDoubleObservableCounter(nostd::string_view name,
InstrumentValueType::kDouble};
auto storage = RegisterAsyncMetricStorage(instrument_descriptor);
return nostd::shared_ptr<metrics::ObservableInstrument>{
new ObservableInstrument(instrument_descriptor, std::move(storage))};
new ObservableInstrument(instrument_descriptor, std::move(storage), observable_registry_)};
}

nostd::shared_ptr<metrics::Histogram<long>> Meter::CreateLongHistogram(
Expand Down Expand Up @@ -122,7 +126,7 @@ nostd::shared_ptr<opentelemetry::metrics::ObservableInstrument> Meter::CreateLon
InstrumentValueType::kLong};
auto storage = RegisterAsyncMetricStorage(instrument_descriptor);
return nostd::shared_ptr<metrics::ObservableInstrument>{
new ObservableInstrument(instrument_descriptor, std::move(storage))};
new ObservableInstrument(instrument_descriptor, std::move(storage), observable_registry_)};
}

nostd::shared_ptr<opentelemetry::metrics::ObservableInstrument> Meter::CreateDoubleObservableGauge(
Expand All @@ -136,7 +140,7 @@ nostd::shared_ptr<opentelemetry::metrics::ObservableInstrument> Meter::CreateDou
InstrumentValueType::kDouble};
auto storage = RegisterAsyncMetricStorage(instrument_descriptor);
return nostd::shared_ptr<metrics::ObservableInstrument>{
new ObservableInstrument(instrument_descriptor, std::move(storage))};
new ObservableInstrument(instrument_descriptor, std::move(storage), observable_registry_)};
}

nostd::shared_ptr<metrics::UpDownCounter<long>> Meter::CreateLongUpDownCounter(
Expand Down Expand Up @@ -178,7 +182,7 @@ Meter::CreateLongObservableUpDownCounter(nostd::string_view name,
InstrumentValueType::kLong};
auto storage = RegisterAsyncMetricStorage(instrument_descriptor);
return nostd::shared_ptr<metrics::ObservableInstrument>{
new ObservableInstrument(instrument_descriptor, std::move(storage))};
new ObservableInstrument(instrument_descriptor, std::move(storage), observable_registry_)};
}

nostd::shared_ptr<opentelemetry::metrics::ObservableInstrument>
Expand All @@ -192,7 +196,7 @@ Meter::CreateDoubleObservableUpDownCounter(nostd::string_view name,
InstrumentValueType::kDouble};
auto storage = RegisterAsyncMetricStorage(instrument_descriptor);
return nostd::shared_ptr<metrics::ObservableInstrument>{
new ObservableInstrument(instrument_descriptor, std::move(storage))};
new ObservableInstrument(instrument_descriptor, std::move(storage), observable_registry_)};
}

const sdk::instrumentationscope::InstrumentationScope *Meter::GetInstrumentationScope()
Expand Down Expand Up @@ -290,6 +294,7 @@ std::vector<MetricData> Meter::Collect(CollectorHandle *collector,
opentelemetry::common::SystemTimestamp collect_ts) noexcept
{

observable_registry_->Observe(collect_ts);
std::vector<MetricData> metric_data_list;
auto ctx = meter_context_.lock();
if (!ctx)
Expand Down
1 change: 1 addition & 0 deletions sdk/test/metrics/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
foreach(
testname
meter_provider_sdk_test
meter_test
view_registry_test
aggregation_test
attributes_processor_test
Expand Down
4 changes: 3 additions & 1 deletion sdk/test/metrics/async_instruments_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ TEST(AsyncInstruments, ObservableInstrument)
InstrumentDescriptor instrument_descriptor = {"long_counter", "description", "1",
InstrumentType::kObservableCounter,
InstrumentValueType::kLong};
std::shared_ptr<ObservableRegistry> observable_registry(new ObservableRegistry());
std::unique_ptr<AsyncWritableMetricStorage> metric_storage(new AsyncMultiMetricStorage());
ObservableInstrument observable_counter_long(instrument_descriptor, std::move(metric_storage));
ObservableInstrument observable_counter_long(instrument_descriptor, std::move(metric_storage),
observable_registry);
observable_counter_long.AddCallback(asyc_generate_measurements, nullptr);
}

Expand Down
75 changes: 75 additions & 0 deletions sdk/test/metrics/meter_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#ifndef ENABLE_METRICS_PREVIEW
# include "opentelemetry/sdk/metrics/meter.h"
# include "opentelemetry/sdk/metrics/data/point_data.h"
# include "opentelemetry/sdk/metrics/meter_context.h"
# include "opentelemetry/sdk/metrics/meter_provider.h"
# include "opentelemetry/sdk/metrics/metric_reader.h"

# include <gtest/gtest.h>

using namespace opentelemetry;
using namespace opentelemetry::sdk::instrumentationscope;
using namespace opentelemetry::sdk::metrics;

class MockMetricReader : public MetricReader
{
public:
MockMetricReader() = default;
AggregationTemporality GetAggregationTemporality(
InstrumentType instrument_type) const noexcept override
{
return AggregationTemporality::kCumulative;
}
bool OnShutDown(std::chrono::microseconds timeout) noexcept override { return true; }
bool OnForceFlush(std::chrono::microseconds timeout) noexcept override { return true; }
void OnInitialized() noexcept override {}
};

namespace
{
nostd::shared_ptr<metrics::Meter> InitMeter(MetricReader **metricReaderPtr)
{
static std::shared_ptr<metrics::MeterProvider> provider(new MeterProvider());
std::unique_ptr<MetricReader> metric_reader(new MockMetricReader());
*metricReaderPtr = metric_reader.get();
auto p = std::static_pointer_cast<MeterProvider>(provider);
p->AddMetricReader(std::move(metric_reader));
auto meter = provider->GetMeter("meter_name");
return meter;
}
} // namespace

void asyc_generate_measurements(opentelemetry::metrics::ObserverResult observer, void *state)
{
auto observer_long =
nostd::get<nostd::shared_ptr<opentelemetry::metrics::ObserverResultT<long>>>(observer);
observer_long->Observe(10l);
}

TEST(MeterTest, BasicAsyncTests)
{
MetricReader *metric_reader_ptr = nullptr;
auto meter = InitMeter(&metric_reader_ptr);
auto observable_counter = meter->CreateLongObservableCounter("observable_counter");
observable_counter->AddCallback(asyc_generate_measurements, nullptr);

size_t count = 0;
metric_reader_ptr->Collect([&count](ResourceMetrics &metric_data) {
EXPECT_EQ(metric_data.scope_metric_data_.size(), 1);
if (metric_data.scope_metric_data_.size())
{
EXPECT_EQ(metric_data.scope_metric_data_[0].metric_data_.size(), 1);
if (metric_data.scope_metric_data_.size())
{
count += metric_data.scope_metric_data_[0].metric_data_.size();
EXPECT_EQ(count, 1);
}
}
return true;
});
}

#endif

0 comments on commit 73c1cb3

Please sign in to comment.