Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix observable Gauge metrics generation #1651

Merged
merged 4 commits into from
Oct 6, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions sdk/src/metrics/aggregation/lastvalue_aggregation.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ void LongLastValueAggregation::Aggregate(long value,
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
point_data_.is_lastvalue_valid_ = true;
point_data_.value_ = value;
point_data_.sample_ts_ = std::chrono::system_clock::now();
}

std::unique_ptr<Aggregation> LongLastValueAggregation::Merge(
Expand Down Expand Up @@ -93,6 +94,7 @@ void DoubleLastValueAggregation::Aggregate(double value,
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
point_data_.is_lastvalue_valid_ = true;
point_data_.value_ = value;
point_data_.sample_ts_ = std::chrono::system_clock::now();
}

std::unique_ptr<Aggregation> DoubleLastValueAggregation::Merge(
Expand All @@ -102,12 +104,12 @@ std::unique_ptr<Aggregation> DoubleLastValueAggregation::Merge(
nostd::get<LastValuePointData>(delta.ToPoint()).sample_ts_.time_since_epoch())
{
LastValuePointData merge_data = std::move(nostd::get<LastValuePointData>(ToPoint()));
return std::unique_ptr<Aggregation>(new LongLastValueAggregation(std::move(merge_data)));
return std::unique_ptr<Aggregation>(new DoubleLastValueAggregation(std::move(merge_data)));
}
else
{
LastValuePointData merge_data = std::move(nostd::get<LastValuePointData>(delta.ToPoint()));
return std::unique_ptr<Aggregation>(new LongLastValueAggregation(std::move(merge_data)));
return std::unique_ptr<Aggregation>(new DoubleLastValueAggregation(std::move(merge_data)));
}
}

Expand All @@ -118,12 +120,12 @@ std::unique_ptr<Aggregation> DoubleLastValueAggregation::Diff(
nostd::get<LastValuePointData>(next.ToPoint()).sample_ts_.time_since_epoch())
{
LastValuePointData diff_data = std::move(nostd::get<LastValuePointData>(ToPoint()));
return std::unique_ptr<Aggregation>(new LongLastValueAggregation(std::move(diff_data)));
return std::unique_ptr<Aggregation>(new DoubleLastValueAggregation(std::move(diff_data)));
}
else
{
LastValuePointData diff_data = std::move(nostd::get<LastValuePointData>(next.ToPoint()));
return std::unique_ptr<Aggregation>(new LongLastValueAggregation(std::move(diff_data)));
return std::unique_ptr<Aggregation>(new DoubleLastValueAggregation(std::move(diff_data)));
}
}

Expand Down
122 changes: 82 additions & 40 deletions sdk/test/metrics/async_metric_storage_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,49 +46,12 @@ class MockCollectorHandle : public CollectorHandle
class WritableMetricStorageTestFixture : public ::testing::TestWithParam<AggregationTemporality>
{};

class MeasurementFetcher
{
public:
static void Fetcher(opentelemetry::metrics::ObserverResult observer_result, void * /*state*/)
{
fetch_count++;
if (fetch_count == 1)
{
opentelemetry::nostd::get<0>(observer_result)->Observe(20l, {{"RequestType", "GET"}});
opentelemetry::nostd::get<0>(observer_result)->Observe(10l, {{"RequestType", "PUT"}});
number_of_get += 20l;
number_of_put += 10l;
}
else if (fetch_count == 2)
{
opentelemetry::nostd::get<0>(observer_result)->Observe(40l, {{"RequestType", "GET"}});
opentelemetry::nostd::get<0>(observer_result)->Observe(20l, {{"RequestType", "PUT"}});
number_of_get += 40l;
number_of_put += 20l;
}
}

static void init_values()
{
fetch_count = 0;
number_of_get = 0;
number_of_put = 0;
}

static size_t fetch_count;
static long number_of_get;
static long number_of_put;
static const size_t number_of_attributes = 2;
};

size_t MeasurementFetcher::fetch_count;
long MeasurementFetcher::number_of_get;
long MeasurementFetcher::number_of_put;
const size_t MeasurementFetcher::number_of_attributes;
class WritableMetricStorageTestObservableGaugeFixture
: public ::testing::TestWithParam<AggregationTemporality>
{};

TEST_P(WritableMetricStorageTestFixture, TestAggregation)
{
MeasurementFetcher::init_values();
AggregationTemporality temporality = GetParam();

InstrumentDescriptor instr_desc = {"name", "desc", "1unit", InstrumentType::kObservableCounter,
Expand Down Expand Up @@ -180,4 +143,83 @@ INSTANTIATE_TEST_SUITE_P(WritableMetricStorageTestLong,
::testing::Values(AggregationTemporality::kCumulative,
AggregationTemporality::kDelta));

TEST_P(WritableMetricStorageTestObservableGaugeFixture, TestAggregation)
{
AggregationTemporality temporality = GetParam();

InstrumentDescriptor instr_desc = {"name", "desc", "1unit", InstrumentType::kObservableGauge,
InstrumentValueType::kLong};

auto sdk_start_ts = std::chrono::system_clock::now();
// Some computation here
auto collection_ts = std::chrono::system_clock::now() + std::chrono::seconds(5);

std::shared_ptr<CollectorHandle> collector(new MockCollectorHandle(temporality));
std::vector<std::shared_ptr<CollectorHandle>> collectors;
collectors.push_back(collector);

std::unique_ptr<AttributesProcessor> default_attributes_processor{
new DefaultAttributesProcessor{}};
opentelemetry::sdk::metrics::AsyncMetricStorage storage(
instr_desc, AggregationType::kLastValue, default_attributes_processor.get(),
std::shared_ptr<opentelemetry::sdk::metrics::AggregationConfig>{});
long freq_cpu0 = 3l;
long freq_cpu1 = 5l;
size_t attribute_count = 2;
std::unordered_map<MetricAttributes, long, AttributeHashGenerator> measurements1 = {
{{{"CPU", "0"}}, freq_cpu0}, {{{"CPU", "1"}}, freq_cpu1}};
storage.RecordLong(measurements1,
opentelemetry::common::SystemTimestamp(std::chrono::system_clock::now()));

storage.Collect(
collector.get(), collectors, sdk_start_ts, collection_ts, [&](const MetricData data) {
for (auto data_attr : data.point_data_attr_)
{
auto data = opentelemetry::nostd::get<LastValuePointData>(data_attr.point_data);
if (opentelemetry::nostd::get<std::string>(data_attr.attributes.find("CPU")->second) ==
"0")
{
EXPECT_EQ(opentelemetry::nostd::get<long>(data.value_), freq_cpu0);
}
else if (opentelemetry::nostd::get<std::string>(
data_attr.attributes.find("CPU")->second) == "1")
{
EXPECT_EQ(opentelemetry::nostd::get<long>(data.value_), freq_cpu1);
}
}
return true;
});

freq_cpu0 = 6l;
freq_cpu1 = 8l;

std::unordered_map<MetricAttributes, long, AttributeHashGenerator> measurements2 = {
{{{"CPU", "0"}}, freq_cpu0}, {{{"CPU", "1"}}, freq_cpu1}};
storage.RecordLong(measurements2,
opentelemetry::common::SystemTimestamp(std::chrono::system_clock::now()));
storage.Collect(
collector.get(), collectors, sdk_start_ts, collection_ts, [&](const MetricData data) {
for (auto data_attr : data.point_data_attr_)
{
auto data = opentelemetry::nostd::get<LastValuePointData>(data_attr.point_data);
if (opentelemetry::nostd::get<std::string>(data_attr.attributes.find("CPU")->second) ==
"0")
{
EXPECT_EQ(opentelemetry::nostd::get<long>(data.value_), freq_cpu0);
}
else if (opentelemetry::nostd::get<std::string>(
data_attr.attributes.find("CPU")->second) == "1")
{
EXPECT_EQ(opentelemetry::nostd::get<long>(data.value_), freq_cpu1);
}
}
return true;
});
}

INSTANTIATE_TEST_SUITE_P(WritableMetricStorageTestObservableGaugeFixtureLong,
WritableMetricStorageTestObservableGaugeFixture,
::testing::Values(AggregationTemporality::kCumulative,
AggregationTemporality::kDelta));

#endif