Skip to content

Commit f21b3a0

Browse files
authored
Fix #1588 - Observable Gauge does not reflect updated values, and send the old value always (#1641)
1 parent d127140 commit f21b3a0

File tree

6 files changed

+61
-18
lines changed

6 files changed

+61
-18
lines changed

examples/common/metrics_foo_library/foo_library.cc

+5-2
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,16 @@ class MeasurementFetcher
3939
if (nostd::holds_alternative<
4040
nostd::shared_ptr<opentelemetry::metrics::ObserverResultT<double>>>(observer_result))
4141
{
42-
double val = (rand() % 700) + 1.1;
42+
double random_incr = (rand() % 5) + 1.1;
43+
value_ += random_incr;
4344
nostd::get<nostd::shared_ptr<opentelemetry::metrics::ObserverResultT<double>>>(
4445
observer_result)
45-
->Observe(val /*, labelkv */);
46+
->Observe(value_ /*, labelkv */);
4647
}
4748
}
49+
static double value_;
4850
};
51+
double MeasurementFetcher::value_ = 0.0;
4952
} // namespace
5053

5154
void foo_library::counter_example(const std::string &name)

examples/metrics_simple/metrics_ostream.cc

+1-1
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ void initMetrics(const std::string &name)
6262
std::unique_ptr<metric_sdk::MeterSelector> observable_meter_selector{
6363
new metric_sdk::MeterSelector(name, version, schema)};
6464
std::unique_ptr<metric_sdk::View> observable_sum_view{
65-
new metric_sdk::View{name, "description", metric_sdk::AggregationType::kSum}};
65+
new metric_sdk::View{name, "test_description", metric_sdk::AggregationType::kSum}};
6666
p->AddView(std::move(observable_instrument_selector), std::move(observable_meter_selector),
6767
std::move(observable_sum_view));
6868

sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h

+7-4
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,9 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora
4343
void Record(const std::unordered_map<MetricAttributes, T, AttributeHashGenerator> &measurements,
4444
opentelemetry::common::SystemTimestamp /* observation_time */) noexcept
4545
{
46-
// process the read measurements - aggregate and store in hashmap
46+
// Async counter always record monotonically increasing values, and the
47+
// exporter/reader can request either for delta or cumulative value.
48+
// So we convert the async counter value to delta before passing it to temporal storage.
4749
std::lock_guard<opentelemetry::common::SpinLockMutex> guard(hashmap_lock_);
4850
for (auto &measurement : measurements)
4951
{
@@ -53,13 +55,14 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora
5355
if (prev)
5456
{
5557
auto delta = prev->Diff(*aggr);
56-
cumulative_hash_map_->Set(measurement.first,
57-
DefaultAggregation::CloneAggregation(
58-
aggregation_type_, instrument_descriptor_, *delta));
58+
// store received value in cumulative map, and the diff in delta map (to pass it to temporal
59+
// storage)
60+
cumulative_hash_map_->Set(measurement.first, std::move(aggr));
5961
delta_hash_map_->Set(measurement.first, std::move(delta));
6062
}
6163
else
6264
{
65+
// store received value in cumulative and delta map.
6366
cumulative_hash_map_->Set(
6467
measurement.first,
6568
DefaultAggregation::CloneAggregation(aggregation_type_, instrument_descriptor_, *aggr));

sdk/src/metrics/meter.cc

-1
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,6 @@ std::unique_ptr<AsyncWritableMetricStorage> Meter::RegisterAsyncMetricStorage(
293293
std::vector<MetricData> Meter::Collect(CollectorHandle *collector,
294294
opentelemetry::common::SystemTimestamp collect_ts) noexcept
295295
{
296-
297296
observable_registry_->Observe(collect_ts);
298297
std::vector<MetricData> metric_data_list;
299298
auto ctx = meter_context_.lock();

sdk/src/metrics/state/temporal_metric_storage.cc

+1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ bool TemporalMetricStorage::buildMetrics(CollectorHandle *collector,
3434
opentelemetry::common::SystemTimestamp last_collection_ts = sdk_start_ts;
3535
AggregationTemporality aggregation_temporarily =
3636
collector->GetAggregationTemporality(instrument_descriptor_.type_);
37+
3738
if (delta_metrics->Size())
3839
{
3940
for (auto &col : collectors)

sdk/test/metrics/async_metric_storage_test.cc

+47-10
Original file line numberDiff line numberDiff line change
@@ -107,12 +107,12 @@ TEST_P(WritableMetricStorageTestFixture, TestAggregation)
107107
opentelemetry::sdk::metrics::AsyncMetricStorage storage(
108108
instr_desc, AggregationType::kSum, default_attributes_processor.get(),
109109
std::shared_ptr<opentelemetry::sdk::metrics::AggregationConfig>{});
110-
long get_count = 20l;
111-
long put_count = 10l;
112-
size_t attribute_count = 2;
113-
std::unordered_map<MetricAttributes, long, AttributeHashGenerator> measurements = {
114-
{{{"RequestType", "GET"}}, get_count}, {{{"RequestType", "PUT"}}, put_count}};
115-
storage.RecordLong(measurements,
110+
long get_count1 = 20l;
111+
long put_count1 = 10l;
112+
size_t attribute_count = 2;
113+
std::unordered_map<MetricAttributes, long, AttributeHashGenerator> measurements1 = {
114+
{{{"RequestType", "GET"}}, get_count1}, {{{"RequestType", "PUT"}}, put_count1}};
115+
storage.RecordLong(measurements1,
116116
opentelemetry::common::SystemTimestamp(std::chrono::system_clock::now()));
117117

118118
storage.Collect(collector.get(), collectors, sdk_start_ts, collection_ts,
@@ -123,20 +123,57 @@ TEST_P(WritableMetricStorageTestFixture, TestAggregation)
123123
if (opentelemetry::nostd::get<std::string>(
124124
data_attr.attributes.find("RequestType")->second) == "GET")
125125
{
126-
EXPECT_EQ(opentelemetry::nostd::get<long>(data.value_), get_count);
126+
EXPECT_EQ(opentelemetry::nostd::get<long>(data.value_), get_count1);
127127
}
128128
else if (opentelemetry::nostd::get<std::string>(
129129
data_attr.attributes.find("RequestType")->second) == "PUT")
130130
{
131-
EXPECT_EQ(opentelemetry::nostd::get<long>(data.value_), put_count);
131+
EXPECT_EQ(opentelemetry::nostd::get<long>(data.value_), put_count1);
132132
}
133133
}
134134
return true;
135135
});
136136
// subsequent recording after collection shouldn't fail
137-
storage.RecordLong(measurements,
137+
// monotonic increasing values;
138+
long get_count2 = 50l;
139+
long put_count2 = 70l;
140+
141+
std::unordered_map<MetricAttributes, long, AttributeHashGenerator> measurements2 = {
142+
{{{"RequestType", "GET"}}, get_count2}, {{{"RequestType", "PUT"}}, put_count2}};
143+
storage.RecordLong(measurements2,
138144
opentelemetry::common::SystemTimestamp(std::chrono::system_clock::now()));
139-
EXPECT_EQ(MeasurementFetcher::number_of_attributes, attribute_count);
145+
storage.Collect(
146+
collector.get(), collectors, sdk_start_ts, collection_ts, [&](const MetricData data) {
147+
for (auto data_attr : data.point_data_attr_)
148+
{
149+
auto data = opentelemetry::nostd::get<SumPointData>(data_attr.point_data);
150+
if (opentelemetry::nostd::get<std::string>(
151+
data_attr.attributes.find("RequestType")->second) == "GET")
152+
{
153+
if (temporality == AggregationTemporality::kCumulative)
154+
{
155+
EXPECT_EQ(opentelemetry::nostd::get<long>(data.value_), get_count2);
156+
}
157+
else
158+
{
159+
EXPECT_EQ(opentelemetry::nostd::get<long>(data.value_), get_count2 - get_count1);
160+
}
161+
}
162+
else if (opentelemetry::nostd::get<std::string>(
163+
data_attr.attributes.find("RequestType")->second) == "PUT")
164+
{
165+
if (temporality == AggregationTemporality::kCumulative)
166+
{
167+
EXPECT_EQ(opentelemetry::nostd::get<long>(data.value_), put_count2);
168+
}
169+
else
170+
{
171+
EXPECT_EQ(opentelemetry::nostd::get<long>(data.value_), put_count2 - put_count1);
172+
}
173+
}
174+
}
175+
return true;
176+
});
140177
}
141178

142179
INSTANTIATE_TEST_SUITE_P(WritableMetricStorageTestLong,

0 commit comments

Comments
 (0)