From 0563a71ce3ea2c580f535996aeb8ce9246f2d703 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Mon, 18 Sep 2023 12:57:28 -0700 Subject: [PATCH] Fix Observable Counters/UpDownCounters (#2298) --- .../metrics/aggregation/default_aggregation.h | 47 ++++++--- sdk/test/metrics/async_metric_storage_test.cc | 95 +++++++++++++++++++ 2 files changed, 130 insertions(+), 12 deletions(-) diff --git a/sdk/include/opentelemetry/sdk/metrics/aggregation/default_aggregation.h b/sdk/include/opentelemetry/sdk/metrics/aggregation/default_aggregation.h index 203a2d32b9..6ae111fd85 100644 --- a/sdk/include/opentelemetry/sdk/metrics/aggregation/default_aggregation.h +++ b/sdk/include/opentelemetry/sdk/metrics/aggregation/default_aggregation.h @@ -28,20 +28,16 @@ class DefaultAggregation const opentelemetry::sdk::metrics::InstrumentDescriptor &instrument_descriptor, const AggregationConfig *aggregation_config) { - switch (instrument_descriptor.type_) + bool is_monotonic = true; + auto aggr_type = GetDefaultAggregationType(instrument_descriptor.type_, is_monotonic); + switch (aggr_type) { - case InstrumentType::kCounter: - case InstrumentType::kObservableCounter: + case AggregationType::kSum: return (instrument_descriptor.value_type_ == InstrumentValueType::kLong) - ? std::move(std::unique_ptr(new LongSumAggregation(true))) + ? std::move(std::unique_ptr(new LongSumAggregation(is_monotonic))) : std::move(std::unique_ptr(new DoubleSumAggregation(true))); - case InstrumentType::kUpDownCounter: - case InstrumentType::kObservableUpDownCounter: - return (instrument_descriptor.value_type_ == InstrumentValueType::kLong) - ? std::move(std::unique_ptr(new LongSumAggregation(false))) - : std::move(std::unique_ptr(new DoubleSumAggregation(false))); break; - case InstrumentType::kHistogram: { + case AggregationType::kHistogram: { if (instrument_descriptor.value_type_ == InstrumentValueType::kLong) { return (std::unique_ptr(new LongHistogramAggregation(aggregation_config))); @@ -53,7 +49,7 @@ class DefaultAggregation break; } - case InstrumentType::kObservableGauge: + case AggregationType::kLastValue: return (instrument_descriptor.value_type_ == InstrumentValueType::kLong) ? std::move(std::unique_ptr(new LongLastValueAggregation())) : std::move(std::unique_ptr(new DoubleLastValueAggregation())); @@ -121,6 +117,11 @@ class DefaultAggregation const Aggregation &to_copy) { const PointType point_data = to_copy.ToPoint(); + bool is_monotonic = true; + if (aggregation_type == AggregationType::kDefault) + { + aggregation_type = GetDefaultAggregationType(instrument_descriptor.type_, is_monotonic); + } switch (aggregation_type) { case AggregationType::kDrop: @@ -159,7 +160,29 @@ class DefaultAggregation new DoubleSumAggregation(nostd::get(point_data))); } default: - return DefaultAggregation::CreateAggregation(instrument_descriptor, nullptr); + return nullptr; // won't reach here + } + } + + static AggregationType GetDefaultAggregationType(InstrumentType instrument_type, + bool &is_monotonic) + { + is_monotonic = false; + switch (instrument_type) + { + case InstrumentType::kCounter: + case InstrumentType::kObservableCounter: + is_monotonic = true; + return AggregationType::kSum; + case InstrumentType::kUpDownCounter: + case InstrumentType::kObservableUpDownCounter: + return AggregationType::kSum; + case InstrumentType::kHistogram: + return AggregationType::kHistogram; + case InstrumentType::kObservableGauge: + return AggregationType::kLastValue; + default: + return AggregationType::kDrop; } } }; diff --git a/sdk/test/metrics/async_metric_storage_test.cc b/sdk/test/metrics/async_metric_storage_test.cc index 0ce91dafe1..1fef4e9de3 100644 --- a/sdk/test/metrics/async_metric_storage_test.cc +++ b/sdk/test/metrics/async_metric_storage_test.cc @@ -30,6 +30,10 @@ using M = std::map; class WritableMetricStorageTestFixture : public ::testing::TestWithParam {}; +class WritableMetricStorageTestUpDownFixture + : public ::testing::TestWithParam +{}; + class WritableMetricStorageTestObservableGaugeFixture : public ::testing::TestWithParam {}; @@ -124,6 +128,97 @@ INSTANTIATE_TEST_SUITE_P(WritableMetricStorageTestLong, ::testing::Values(AggregationTemporality::kCumulative, AggregationTemporality::kDelta)); +TEST_P(WritableMetricStorageTestUpDownFixture, TestAggregation) +{ + AggregationTemporality temporality = GetParam(); + + InstrumentDescriptor instr_desc = {"name", "desc", "1unit", + InstrumentType::kObservableUpDownCounter, + InstrumentValueType::kLong}; + + auto sdk_start_ts = std::chrono::system_clock::now(); + // Some computation here + auto collection_ts = std::chrono::system_clock::now() + std::chrono::seconds(5); + + std::shared_ptr collector(new MockCollectorHandle(temporality)); + std::vector> collectors; + collectors.push_back(collector); + + opentelemetry::sdk::metrics::AsyncMetricStorage storage(instr_desc, AggregationType::kDefault, + nullptr); + int64_t get_count1 = 20; + int64_t put_count1 = 10; + std::unordered_map measurements1 = { + {{{"RequestType", "GET"}}, get_count1}, {{{"RequestType", "PUT"}}, put_count1}}; + storage.RecordLong(measurements1, + opentelemetry::common::SystemTimestamp(std::chrono::system_clock::now())); + + storage.Collect( + collector.get(), collectors, sdk_start_ts, collection_ts, [&](const MetricData &metric_data) { + for (const auto &data_attr : metric_data.point_data_attr_) + { + const auto &data = opentelemetry::nostd::get(data_attr.point_data); + if (opentelemetry::nostd::get( + data_attr.attributes.find("RequestType")->second) == "GET") + { + EXPECT_EQ(opentelemetry::nostd::get(data.value_), get_count1); + } + else if (opentelemetry::nostd::get( + data_attr.attributes.find("RequestType")->second) == "PUT") + { + EXPECT_EQ(opentelemetry::nostd::get(data.value_), put_count1); + } + } + return true; + }); + // subsequent recording after collection shouldn't fail + // monotonic increasing values; + int64_t get_count2 = -50; + int64_t put_count2 = -70; + + std::unordered_map measurements2 = { + {{{"RequestType", "GET"}}, get_count2}, {{{"RequestType", "PUT"}}, put_count2}}; + storage.RecordLong(measurements2, + opentelemetry::common::SystemTimestamp(std::chrono::system_clock::now())); + storage.Collect( + collector.get(), collectors, sdk_start_ts, collection_ts, [&](const MetricData &metric_data) { + for (const auto &data_attr : metric_data.point_data_attr_) + { + const auto &data = opentelemetry::nostd::get(data_attr.point_data); + if (opentelemetry::nostd::get( + data_attr.attributes.find("RequestType")->second) == "GET") + { + if (temporality == AggregationTemporality::kCumulative) + { + EXPECT_EQ(opentelemetry::nostd::get(data.value_), get_count2); + } + else + { + EXPECT_EQ(opentelemetry::nostd::get(data.value_), get_count2 - get_count1); + } + } + else if (opentelemetry::nostd::get( + data_attr.attributes.find("RequestType")->second) == "PUT") + { + if (temporality == AggregationTemporality::kCumulative) + { + EXPECT_EQ(opentelemetry::nostd::get(data.value_), put_count2); + } + else + { + EXPECT_EQ(opentelemetry::nostd::get(data.value_), put_count2 - put_count1); + } + } + } + return true; + }); +} + +INSTANTIATE_TEST_SUITE_P(WritableMetricStorageTestUpDownLong, + WritableMetricStorageTestUpDownFixture, + ::testing::Values(AggregationTemporality::kCumulative, + AggregationTemporality::kDelta)); + TEST_P(WritableMetricStorageTestObservableGaugeFixture, TestAggregation) { AggregationTemporality temporality = GetParam();