From a88bbcbd7202cf6d1b0db38466bd3e9fb8608e9d Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Wed, 6 Sep 2023 16:34:11 -0700 Subject: [PATCH 1/4] fix async --- .../metrics/aggregation/default_aggregation.h | 47 +++++++--- sdk/test/metrics/async_metric_storage_test.cc | 94 ++++++++++++++++++- 2 files changed, 127 insertions(+), 14 deletions(-) diff --git a/sdk/include/opentelemetry/sdk/metrics/aggregation/default_aggregation.h b/sdk/include/opentelemetry/sdk/metrics/aggregation/default_aggregation.h index 203a2d32b9..09c6476928 100644 --- a/sdk/include/opentelemetry/sdk/metrics/aggregation/default_aggregation.h +++ b/sdk/include/opentelemetry/sdk/metrics/aggregation/default_aggregation.h @@ -28,20 +28,16 @@ class DefaultAggregation const opentelemetry::sdk::metrics::InstrumentDescriptor &instrument_descriptor, const AggregationConfig *aggregation_config) { - switch (instrument_descriptor.type_) + bool is_monotonic = true; + auto aggr_type = GetDefaultAggregationType(instrument_descriptor.type_, is_monotonic); + switch (aggr_type) { - case InstrumentType::kCounter: - case InstrumentType::kObservableCounter: + case AggregationType::kSum: return (instrument_descriptor.value_type_ == InstrumentValueType::kLong) - ? std::move(std::unique_ptr(new LongSumAggregation(true))) + ? std::move(std::unique_ptr(new LongSumAggregation(is_monotonic))) : std::move(std::unique_ptr(new DoubleSumAggregation(true))); - case InstrumentType::kUpDownCounter: - case InstrumentType::kObservableUpDownCounter: - return (instrument_descriptor.value_type_ == InstrumentValueType::kLong) - ? std::move(std::unique_ptr(new LongSumAggregation(false))) - : std::move(std::unique_ptr(new DoubleSumAggregation(false))); break; - case InstrumentType::kHistogram: { + case AggregationType::kHistogram: { if (instrument_descriptor.value_type_ == InstrumentValueType::kLong) { return (std::unique_ptr(new LongHistogramAggregation(aggregation_config))); @@ -53,7 +49,7 @@ class DefaultAggregation break; } - case InstrumentType::kObservableGauge: + case AggregationType::kLastValue: return (instrument_descriptor.value_type_ == InstrumentValueType::kLong) ? std::move(std::unique_ptr(new LongLastValueAggregation())) : std::move(std::unique_ptr(new DoubleLastValueAggregation())); @@ -121,6 +117,11 @@ class DefaultAggregation const Aggregation &to_copy) { const PointType point_data = to_copy.ToPoint(); + bool is_monotonic = true; + if (aggregation_type == AggregationType::kDefault) + { + aggregation_type = GetDefaultAggregationType(instrument_descriptor.type_, is_monotonic); + } switch (aggregation_type) { case AggregationType::kDrop: @@ -159,7 +160,29 @@ class DefaultAggregation new DoubleSumAggregation(nostd::get(point_data))); } default: - return DefaultAggregation::CreateAggregation(instrument_descriptor, nullptr); + return nullptr; // won't reach here + } + } + + static AggregationType GetDefaultAggregationType(InstrumentType instrument_type, + bool &is_monotonic) + { + is_monotonic = true; + switch (instrument_type) + { + case InstrumentType::kCounter: + case InstrumentType::kObservableCounter: + is_monotonic = false; + return AggregationType::kSum; + case InstrumentType::kUpDownCounter: + case InstrumentType::kObservableUpDownCounter: + return AggregationType::kSum; + case InstrumentType::kHistogram: + return AggregationType::kHistogram; + case InstrumentType::kObservableGauge: + return AggregationType::kLastValue; + default: + return AggregationType::kDrop; } } }; diff --git a/sdk/test/metrics/async_metric_storage_test.cc b/sdk/test/metrics/async_metric_storage_test.cc index 0ce91dafe1..4ddf68b996 100644 --- a/sdk/test/metrics/async_metric_storage_test.cc +++ b/sdk/test/metrics/async_metric_storage_test.cc @@ -30,6 +30,10 @@ using M = std::map; class WritableMetricStorageTestFixture : public ::testing::TestWithParam {}; +class WritableMetricStorageTestUpDownFixture + : public ::testing::TestWithParam +{}; + class WritableMetricStorageTestObservableGaugeFixture : public ::testing::TestWithParam {}; @@ -119,8 +123,94 @@ TEST_P(WritableMetricStorageTestFixture, TestAggregation) }); } -INSTANTIATE_TEST_SUITE_P(WritableMetricStorageTestLong, - WritableMetricStorageTestFixture, +TEST_P(WritableMetricStorageTestUpDownFixture, TestAggregation) +{ + AggregationTemporality temporality = GetParam(); + + InstrumentDescriptor instr_desc = {"name", "desc", "1unit", + InstrumentType::kObservableUpDownCounter, + InstrumentValueType::kLong}; + + auto sdk_start_ts = std::chrono::system_clock::now(); + // Some computation here + auto collection_ts = std::chrono::system_clock::now() + std::chrono::seconds(5); + + std::shared_ptr collector(new MockCollectorHandle(temporality)); + std::vector> collectors; + collectors.push_back(collector); + + opentelemetry::sdk::metrics::AsyncMetricStorage storage(instr_desc, AggregationType::kDefault, + nullptr); + int64_t get_count1 = 20; + int64_t put_count1 = 10; + std::unordered_map measurements1 = { + {{{"RequestType", "GET"}}, get_count1}, {{{"RequestType", "PUT"}}, put_count1}}; + storage.RecordLong(measurements1, + opentelemetry::common::SystemTimestamp(std::chrono::system_clock::now())); + + storage.Collect( + collector.get(), collectors, sdk_start_ts, collection_ts, [&](const MetricData &metric_data) { + for (const auto &data_attr : metric_data.point_data_attr_) + { + const auto &data = opentelemetry::nostd::get(data_attr.point_data); + if (opentelemetry::nostd::get( + data_attr.attributes.find("RequestType")->second) == "GET") + { + EXPECT_EQ(opentelemetry::nostd::get(data.value_), get_count1); + } + else if (opentelemetry::nostd::get( + data_attr.attributes.find("RequestType")->second) == "PUT") + { + EXPECT_EQ(opentelemetry::nostd::get(data.value_), put_count1); + } + } + return true; + }); + // subsequent recording after collection shouldn't fail + // monotonic increasing values; + int64_t get_count2 = 50; + int64_t put_count2 = 70; + + std::unordered_map measurements2 = { + {{{"RequestType", "GET"}}, get_count2}, {{{"RequestType", "PUT"}}, put_count2}}; + storage.RecordLong(measurements2, + opentelemetry::common::SystemTimestamp(std::chrono::system_clock::now())); + storage.Collect( + collector.get(), collectors, sdk_start_ts, collection_ts, [&](const MetricData &metric_data) { + for (const auto &data_attr : metric_data.point_data_attr_) + { + const auto &data = opentelemetry::nostd::get(data_attr.point_data); + if (opentelemetry::nostd::get( + data_attr.attributes.find("RequestType")->second) == "GET") + { + if (temporality == AggregationTemporality::kCumulative) + { + EXPECT_EQ(opentelemetry::nostd::get(data.value_), get_count2); + } + else + { + EXPECT_EQ(opentelemetry::nostd::get(data.value_), get_count2 - get_count1); + } + } + else if (opentelemetry::nostd::get( + data_attr.attributes.find("RequestType")->second) == "PUT") + { + if (temporality == AggregationTemporality::kCumulative) + { + EXPECT_EQ(opentelemetry::nostd::get(data.value_), put_count2); + } + else + { + EXPECT_EQ(opentelemetry::nostd::get(data.value_), put_count2 - put_count1); + } + } + } + return true; + }); +} + +INSTANTIATE_TEST_SUITE_P(WritableMetricStorageTestUpDownLong, + WritableMetricStorageTestUpDownFixture, ::testing::Values(AggregationTemporality::kCumulative, AggregationTemporality::kDelta)); From 24f0074426006dc862e10278413064be356a362b Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Wed, 6 Sep 2023 17:15:12 -0700 Subject: [PATCH 2/4] fix test --- sdk/test/metrics/async_metric_storage_test.cc | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sdk/test/metrics/async_metric_storage_test.cc b/sdk/test/metrics/async_metric_storage_test.cc index 4ddf68b996..6ca3abe529 100644 --- a/sdk/test/metrics/async_metric_storage_test.cc +++ b/sdk/test/metrics/async_metric_storage_test.cc @@ -123,6 +123,11 @@ TEST_P(WritableMetricStorageTestFixture, TestAggregation) }); } +INSTANTIATE_TEST_SUITE_P(WritableMetricStorageTestLong, + WritableMetricStorageTestFixture, + ::testing::Values(AggregationTemporality::kCumulative, + AggregationTemporality::kDelta)); + TEST_P(WritableMetricStorageTestUpDownFixture, TestAggregation) { AggregationTemporality temporality = GetParam(); From af38971610633da9b69cd02f55740e1672afbb92 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Thu, 7 Sep 2023 10:27:14 -0700 Subject: [PATCH 3/4] add test to fail --- sdk/test/metrics/async_metric_storage_test.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/test/metrics/async_metric_storage_test.cc b/sdk/test/metrics/async_metric_storage_test.cc index 6ca3abe529..1fef4e9de3 100644 --- a/sdk/test/metrics/async_metric_storage_test.cc +++ b/sdk/test/metrics/async_metric_storage_test.cc @@ -173,8 +173,8 @@ TEST_P(WritableMetricStorageTestUpDownFixture, TestAggregation) }); // subsequent recording after collection shouldn't fail // monotonic increasing values; - int64_t get_count2 = 50; - int64_t put_count2 = 70; + int64_t get_count2 = -50; + int64_t put_count2 = -70; std::unordered_map measurements2 = { {{{"RequestType", "GET"}}, get_count2}, {{{"RequestType", "PUT"}}, put_count2}}; From 7afca530cf2746b013fae6380cf2db2ef62aaa41 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Thu, 7 Sep 2023 10:51:32 -0700 Subject: [PATCH 4/4] fix is_monotonic logic --- .../sdk/metrics/aggregation/default_aggregation.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/include/opentelemetry/sdk/metrics/aggregation/default_aggregation.h b/sdk/include/opentelemetry/sdk/metrics/aggregation/default_aggregation.h index 09c6476928..6ae111fd85 100644 --- a/sdk/include/opentelemetry/sdk/metrics/aggregation/default_aggregation.h +++ b/sdk/include/opentelemetry/sdk/metrics/aggregation/default_aggregation.h @@ -167,12 +167,12 @@ class DefaultAggregation static AggregationType GetDefaultAggregationType(InstrumentType instrument_type, bool &is_monotonic) { - is_monotonic = true; + is_monotonic = false; switch (instrument_type) { case InstrumentType::kCounter: case InstrumentType::kObservableCounter: - is_monotonic = false; + is_monotonic = true; return AggregationType::kSum; case InstrumentType::kUpDownCounter: case InstrumentType::kObservableUpDownCounter: