Skip to content

Commit

Permalink
fix Histogram crash (open-telemetry#1685)
Browse files Browse the repository at this point in the history
  • Loading branch information
esigo authored and yxue committed Dec 5, 2022
1 parent bee0d8b commit fccc108
Show file tree
Hide file tree
Showing 14 changed files with 82 additions and 80 deletions.
9 changes: 4 additions & 5 deletions examples/metrics_simple/metrics_ostream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,10 @@ void initMetrics(const std::string &name)
std::unique_ptr<metric_sdk::MeterSelector> histogram_meter_selector{
new metric_sdk::MeterSelector(name, version, schema)};
std::shared_ptr<opentelemetry::sdk::metrics::AggregationConfig> aggregation_config{
new opentelemetry::sdk::metrics::HistogramAggregationConfig<double>};
static_cast<opentelemetry::sdk::metrics::HistogramAggregationConfig<double> *>(
aggregation_config.get())
->boundaries_ =
std::list<double>{0.0, 50.0, 100.0, 250.0, 500.0, 750.0, 1000.0, 2500.0, 5000.0, 10000.0};
new opentelemetry::sdk::metrics::HistogramAggregationConfig};
static_cast<opentelemetry::sdk::metrics::HistogramAggregationConfig *>(aggregation_config.get())
->boundaries_ = std::list<double>{0.0, 50.0, 100.0, 250.0, 500.0, 750.0,
1000.0, 2500.0, 5000.0, 10000.0, 20000.0};
std::unique_ptr<metric_sdk::View> histogram_view{new metric_sdk::View{
name, "description", metric_sdk::AggregationType::kHistogram, aggregation_config}};
p->AddView(std::move(histogram_instrument_selector), std::move(histogram_meter_selector),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ class AggregationConfig
virtual ~AggregationConfig() = default;
};

template <typename T>
class HistogramAggregationConfig : public AggregationConfig
{
public:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class DefaultAggregation
public:
static std::unique_ptr<Aggregation> CreateAggregation(
const opentelemetry::sdk::metrics::InstrumentDescriptor &instrument_descriptor,
const opentelemetry::sdk::metrics::AggregationConfig *aggregation_config)
const AggregationConfig *aggregation_config)
{
switch (instrument_descriptor.type_)
{
Expand All @@ -40,14 +40,15 @@ class DefaultAggregation
: std::move(std::unique_ptr<Aggregation>(new DoubleSumAggregation()));
break;
case InstrumentType::kHistogram: {
return (instrument_descriptor.value_type_ == InstrumentValueType::kLong)
? std::move(std::unique_ptr<Aggregation>(new LongHistogramAggregation(
static_cast<
const opentelemetry::sdk::metrics::HistogramAggregationConfig<long> *>(
aggregation_config))))
: std::move(std::unique_ptr<Aggregation>(new DoubleHistogramAggregation(
static_cast<const opentelemetry::sdk::metrics::HistogramAggregationConfig<
double> *>(aggregation_config))));
if (instrument_descriptor.value_type_ == InstrumentValueType::kLong)
{
return (std::unique_ptr<Aggregation>(new LongHistogramAggregation(aggregation_config)));
}
else
{
return (std::unique_ptr<Aggregation>(new DoubleHistogramAggregation(aggregation_config)));
}

break;
}
case InstrumentType::kObservableGauge:
Expand All @@ -60,8 +61,10 @@ class DefaultAggregation
};
}

static std::unique_ptr<Aggregation> CreateAggregation(AggregationType aggregation_type,
InstrumentDescriptor instrument_descriptor)
static std::unique_ptr<Aggregation> CreateAggregation(
AggregationType aggregation_type,
InstrumentDescriptor instrument_descriptor,
const AggregationConfig *aggregation_config = nullptr)
{
switch (aggregation_type)
{
Expand All @@ -71,11 +74,11 @@ class DefaultAggregation
case AggregationType::kHistogram:
if (instrument_descriptor.value_type_ == InstrumentValueType::kLong)
{
return std::unique_ptr<Aggregation>(new LongHistogramAggregation());
return std::unique_ptr<Aggregation>(new LongHistogramAggregation(aggregation_config));
}
else
{
return std::unique_ptr<Aggregation>(new DoubleHistogramAggregation());
return std::unique_ptr<Aggregation>(new DoubleHistogramAggregation(aggregation_config));
}
break;
case AggregationType::kLastValue:
Expand All @@ -99,7 +102,7 @@ class DefaultAggregation
}
break;
default:
return DefaultAggregation::CreateAggregation(instrument_descriptor, nullptr);
return DefaultAggregation::CreateAggregation(instrument_descriptor, aggregation_config);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

#pragma once
#include <memory>
#ifndef ENABLE_METRICS_PREVIEW
# include "opentelemetry/common/spin_lock_mutex.h"
# include "opentelemetry/sdk/metrics/aggregation/aggregation.h"
Expand All @@ -18,7 +19,7 @@ namespace metrics
class LongHistogramAggregation : public Aggregation
{
public:
LongHistogramAggregation(const HistogramAggregationConfig<long> *aggregation_config = nullptr);
LongHistogramAggregation(const AggregationConfig *aggregation_config = nullptr);
LongHistogramAggregation(HistogramPointData &&);
LongHistogramAggregation(const HistogramPointData &);

Expand Down Expand Up @@ -48,8 +49,7 @@ class LongHistogramAggregation : public Aggregation
class DoubleHistogramAggregation : public Aggregation
{
public:
DoubleHistogramAggregation(
const HistogramAggregationConfig<double> *aggregation_config = nullptr);
DoubleHistogramAggregation(const AggregationConfig *aggregation_config = nullptr);
DoubleHistogramAggregation(HistogramPointData &&);
DoubleHistogramAggregation(const HistogramPointData &);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora
AsyncMetricStorage(InstrumentDescriptor instrument_descriptor,
const AggregationType aggregation_type,
const AttributesProcessor *attributes_processor,
nostd::shared_ptr<AggregationConfig> aggregation_config,
const AggregationConfig *aggregation_config,
void *state = nullptr)
: instrument_descriptor_(instrument_descriptor),
aggregation_type_{aggregation_type},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage
const AggregationType aggregation_type,
const AttributesProcessor *attributes_processor,
nostd::shared_ptr<ExemplarReservoir> &&exemplar_reservoir,
nostd::shared_ptr<AggregationConfig> aggregation_config)
const AggregationConfig *aggregation_config)
: instrument_descriptor_(instrument_descriptor),
aggregation_type_{aggregation_type},
attributes_hashmap_(new AttributesHashMap()),
Expand All @@ -40,8 +40,9 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage
temporal_metric_storage_(instrument_descriptor, aggregation_config)

{
create_default_aggregation_ = [&]() -> std::unique_ptr<Aggregation> {
return DefaultAggregation::CreateAggregation(aggregation_type_, instrument_descriptor_);
create_default_aggregation_ = [&, aggregation_config]() -> std::unique_ptr<Aggregation> {
return DefaultAggregation::CreateAggregation(aggregation_type_, instrument_descriptor_,
aggregation_config);
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class TemporalMetricStorage
{
public:
TemporalMetricStorage(InstrumentDescriptor instrument_descriptor,
nostd::shared_ptr<AggregationConfig> aggregation_config);
const AggregationConfig *aggregation_config);

bool buildMetrics(CollectorHandle *collector,
nostd::span<std::shared_ptr<CollectorHandle>> collectors,
Expand All @@ -46,7 +46,7 @@ class TemporalMetricStorage

// Lock while building metrics
mutable opentelemetry::common::SpinLockMutex lock_;
const nostd::shared_ptr<AggregationConfig> aggregation_config_;
const AggregationConfig *aggregation_config_;
};
} // namespace metrics
} // namespace sdk
Expand Down
8 changes: 4 additions & 4 deletions sdk/include/opentelemetry/sdk/metrics/view/view.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class View
View(const std::string &name,
const std::string &description = "",
AggregationType aggregation_type = AggregationType::kDefault,
std::shared_ptr<AggregationConfig> aggregation_config = std::shared_ptr<AggregationConfig>{},
std::shared_ptr<AggregationConfig> aggregation_config = nullptr,
std::unique_ptr<opentelemetry::sdk::metrics::AttributesProcessor> attributes_processor =
std::unique_ptr<opentelemetry::sdk::metrics::AttributesProcessor>(
new opentelemetry::sdk::metrics::DefaultAttributesProcessor()))
Expand All @@ -45,9 +45,9 @@ class View

virtual AggregationType GetAggregationType() const noexcept { return aggregation_type_; }

virtual nostd::shared_ptr<AggregationConfig> GetAggregationConfig() const noexcept
virtual AggregationConfig *GetAggregationConfig() const noexcept
{
return aggregation_config_;
return aggregation_config_.get();
}

virtual const opentelemetry::sdk::metrics::AttributesProcessor &GetAttributesProcessor()
Expand All @@ -60,7 +60,7 @@ class View
std::string name_;
std::string description_;
AggregationType aggregation_type_;
nostd::shared_ptr<AggregationConfig> aggregation_config_;
std::shared_ptr<AggregationConfig> aggregation_config_;
std::unique_ptr<opentelemetry::sdk::metrics::AttributesProcessor> attributes_processor_;
};
} // namespace metrics
Expand Down
32 changes: 19 additions & 13 deletions sdk/src/metrics/aggregation/histogram_aggregation.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
# include <algorithm>
# include <iomanip>
# include <limits>
# include <memory>
# include "opentelemetry/version.h"

# include <mutex>
Expand All @@ -15,22 +16,22 @@ namespace sdk
namespace metrics
{

LongHistogramAggregation::LongHistogramAggregation(
const HistogramAggregationConfig<long> *aggregation_config)
LongHistogramAggregation::LongHistogramAggregation(const AggregationConfig *aggregation_config)
{
if (aggregation_config && aggregation_config->boundaries_.size())
auto ac = static_cast<const HistogramAggregationConfig *>(aggregation_config);
if (ac && ac->boundaries_.size())
{
point_data_.boundaries_ = aggregation_config->boundaries_;
point_data_.boundaries_ = ac->boundaries_;
}
else
{
point_data_.boundaries_ = {0.0, 5.0, 10.0, 25.0, 50.0, 75.0, 100.0, 250.0,
500.0, 750.0, 1000.0, 2500.0, 5000.0, 7500.0, 10000.0};
}

if (aggregation_config)
if (ac)
{
record_min_max_ = aggregation_config->record_min_max_;
record_min_max_ = ac->record_min_max_;
}
point_data_.counts_ = std::vector<uint64_t>(point_data_.boundaries_.size() + 1, 0);
point_data_.sum_ = 0l;
Expand Down Expand Up @@ -98,21 +99,21 @@ PointType LongHistogramAggregation::ToPoint() const noexcept
return point_data_;
}

DoubleHistogramAggregation::DoubleHistogramAggregation(
const HistogramAggregationConfig<double> *aggregation_config)
DoubleHistogramAggregation::DoubleHistogramAggregation(const AggregationConfig *aggregation_config)
{
if (aggregation_config && aggregation_config->boundaries_.size())
auto ac = static_cast<const HistogramAggregationConfig *>(aggregation_config);
if (ac && ac->boundaries_.size())
{
point_data_.boundaries_ = aggregation_config->boundaries_;
point_data_.boundaries_ = ac->boundaries_;
}
else
{
point_data_.boundaries_ =
std::list<double>{0.0, 5.0, 10.0, 25.0, 50.0, 75.0, 100.0, 250.0, 500.0, 1000.0};
}
if (aggregation_config)
if (ac)
{
record_min_max_ = aggregation_config->record_min_max_;
record_min_max_ = ac->record_min_max_;
}
point_data_.counts_ = std::vector<uint64_t>(point_data_.boundaries_.size() + 1, 0);
point_data_.sum_ = 0.0;
Expand Down Expand Up @@ -159,7 +160,12 @@ std::unique_ptr<Aggregation> DoubleHistogramAggregation::Merge(
auto curr_value = nostd::get<HistogramPointData>(ToPoint());
auto delta_value = nostd::get<HistogramPointData>(
(static_cast<const DoubleHistogramAggregation &>(delta).ToPoint()));
DoubleHistogramAggregation *aggr = new DoubleHistogramAggregation();
std::shared_ptr<AggregationConfig> aggregation_config(new HistogramAggregationConfig);
static_cast<opentelemetry::sdk::metrics::HistogramAggregationConfig *>(aggregation_config.get())
->boundaries_ = curr_value.boundaries_;
static_cast<opentelemetry::sdk::metrics::HistogramAggregationConfig *>(aggregation_config.get())
->record_min_max_ = record_min_max_;
DoubleHistogramAggregation *aggr = new DoubleHistogramAggregation(aggregation_config.get());
HistogramMerge<double>(curr_value, delta_value, aggr->point_data_);
return std::unique_ptr<Aggregation>(aggr);
}
Expand Down
36 changes: 18 additions & 18 deletions sdk/src/metrics/state/temporal_metric_storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ namespace sdk
namespace metrics
{

TemporalMetricStorage::TemporalMetricStorage(
InstrumentDescriptor instrument_descriptor,
nostd::shared_ptr<AggregationConfig> aggregation_config)
TemporalMetricStorage::TemporalMetricStorage(InstrumentDescriptor instrument_descriptor,
const AggregationConfig *aggregation_config)
: instrument_descriptor_(instrument_descriptor), aggregation_config_(aggregation_config)
{}

Expand Down Expand Up @@ -67,7 +66,7 @@ bool TemporalMetricStorage::buildMetrics(CollectorHandle *collector,
else
{
merged_metrics->Set(attributes, DefaultAggregation::CreateAggregation(
instrument_descriptor_, aggregation_config_.get())
instrument_descriptor_, aggregation_config_)
->Merge(aggregation));
}
return true;
Expand All @@ -90,20 +89,21 @@ bool TemporalMetricStorage::buildMetrics(CollectorHandle *collector,
if (aggregation_temporarily == AggregationTemporality::kCumulative)
{
// merge current delta to previous cumulative
last_aggr_hashmap->GetAllEnteries(
[&merged_metrics, this](const MetricAttributes &attributes, Aggregation &aggregation) {
auto agg = merged_metrics->Get(attributes);
if (agg)
{
merged_metrics->Set(attributes, agg->Merge(aggregation));
}
else
{
auto def_agg = DefaultAggregation::CreateAggregation(instrument_descriptor_, nullptr);
merged_metrics->Set(attributes, def_agg->Merge(aggregation));
}
return true;
});
last_aggr_hashmap->GetAllEnteries([&merged_metrics, this](const MetricAttributes &attributes,
Aggregation &aggregation) {
auto agg = merged_metrics->Get(attributes);
if (agg)
{
merged_metrics->Set(attributes, agg->Merge(aggregation));
}
else
{
auto def_agg =
DefaultAggregation::CreateAggregation(instrument_descriptor_, aggregation_config_);
merged_metrics->Set(attributes, def_agg->Merge(aggregation));
}
return true;
});
}
last_reported_metrics_[collector] =
LastReportedMetrics{std::move(merged_metrics), collection_ts};
Expand Down
8 changes: 4 additions & 4 deletions sdk/test/metrics/aggregation_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ TEST(Aggregation, LongHistogramAggregation)

TEST(Aggregation, LongHistogramAggregationBoundaries)
{
nostd::shared_ptr<opentelemetry::sdk::metrics::HistogramAggregationConfig<long>>
aggregation_config{new opentelemetry::sdk::metrics::HistogramAggregationConfig<long>};
std::shared_ptr<opentelemetry::sdk::metrics::HistogramAggregationConfig> aggregation_config{
new opentelemetry::sdk::metrics::HistogramAggregationConfig};
std::list<double> user_boundaries = {0.0, 50.0, 100.0, 250.0, 500.0,
750.0, 1000.0, 2500.0, 5000.0, 10000.0};
aggregation_config->boundaries_ = user_boundaries;
Expand All @@ -145,8 +145,8 @@ TEST(Aggregation, LongHistogramAggregationBoundaries)

TEST(Aggregation, DoubleHistogramAggregationBoundaries)
{
nostd::shared_ptr<opentelemetry::sdk::metrics::HistogramAggregationConfig<double>>
aggregation_config{new opentelemetry::sdk::metrics::HistogramAggregationConfig<double>};
std::shared_ptr<opentelemetry::sdk::metrics::HistogramAggregationConfig> aggregation_config{
new opentelemetry::sdk::metrics::HistogramAggregationConfig};
std::list<double> user_boundaries = {0.0, 50.0, 100.0, 250.0, 500.0,
750.0, 1000.0, 2500.0, 5000.0, 10000.0};
aggregation_config->boundaries_ = user_boundaries;
Expand Down
6 changes: 2 additions & 4 deletions sdk/test/metrics/async_metric_storage_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@ TEST_P(WritableMetricStorageTestFixture, TestAggregation)
std::unique_ptr<AttributesProcessor> default_attributes_processor{
new DefaultAttributesProcessor{}};
opentelemetry::sdk::metrics::AsyncMetricStorage storage(
instr_desc, AggregationType::kSum, default_attributes_processor.get(),
std::shared_ptr<opentelemetry::sdk::metrics::AggregationConfig>{});
instr_desc, AggregationType::kSum, default_attributes_processor.get(), nullptr);
long get_count1 = 20l;
long put_count1 = 10l;
std::unordered_map<MetricAttributes, long, AttributeHashGenerator> measurements1 = {
Expand Down Expand Up @@ -161,8 +160,7 @@ TEST_P(WritableMetricStorageTestObservableGaugeFixture, TestAggregation)
std::unique_ptr<AttributesProcessor> default_attributes_processor{
new DefaultAttributesProcessor{}};
opentelemetry::sdk::metrics::AsyncMetricStorage storage(
instr_desc, AggregationType::kLastValue, default_attributes_processor.get(),
std::shared_ptr<opentelemetry::sdk::metrics::AggregationConfig>{});
instr_desc, AggregationType::kLastValue, default_attributes_processor.get(), nullptr);
long freq_cpu0 = 3l;
long freq_cpu1 = 5l;
std::unordered_map<MetricAttributes, long, AttributeHashGenerator> measurements1 = {
Expand Down
6 changes: 2 additions & 4 deletions sdk/test/metrics/sync_metric_storage_counter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ TEST_P(WritableMetricStorageTestFixture, LongSumAggregation)
new DefaultAttributesProcessor{}};
opentelemetry::sdk::metrics::SyncMetricStorage storage(
instr_desc, AggregationType::kSum, default_attributes_processor.get(),
ExemplarReservoir::GetNoExemplarReservoir(),
std::shared_ptr<opentelemetry::sdk::metrics::AggregationConfig>{});
ExemplarReservoir::GetNoExemplarReservoir(), nullptr);

storage.RecordLong(10l, KeyValueIterableView<std::map<std::string, std::string>>(attributes_get),
opentelemetry::context::Context{});
Expand Down Expand Up @@ -188,8 +187,7 @@ TEST_P(WritableMetricStorageTestFixture, DoubleSumAggregation)
new DefaultAttributesProcessor{}};
opentelemetry::sdk::metrics::SyncMetricStorage storage(
instr_desc, AggregationType::kSum, default_attributes_processor.get(),
ExemplarReservoir::GetNoExemplarReservoir(),
std::shared_ptr<opentelemetry::sdk::metrics::AggregationConfig>{});
ExemplarReservoir::GetNoExemplarReservoir(), nullptr);

storage.RecordDouble(10.0,
KeyValueIterableView<std::map<std::string, std::string>>(attributes_get),
Expand Down
Loading

0 comments on commit fccc108

Please sign in to comment.