Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multiple async callbacks #1495

Merged
merged 32 commits into from
Aug 5, 2022
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
changes
lalitb committed Jul 25, 2022
commit 8d543350c7f3be1b1b26a35c4a49c61c480c2b91
Original file line number Diff line number Diff line change
@@ -21,43 +21,71 @@ namespace sdk
namespace metrics
{

template <class T>
class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStorage
{
public:
AsyncMetricStorage(
InstrumentDescriptor instrument_descriptor,
const AggregationType aggregation_type,
// void (*measurement_callback)(opentelemetry::metrics::ObserverResult &, void *),
const AttributesProcessor *attributes_processor,
void *state = nullptr)
AsyncMetricStorage(InstrumentDescriptor instrument_descriptor,
const AggregationType aggregation_type,
const AttributesProcessor *attributes_processor,
void *state = nullptr)
: instrument_descriptor_(instrument_descriptor),
aggregation_type_{aggregation_type},
// measurement_collection_callback_{measurement_callback},
attributes_processor_{attributes_processor},
state_{state},
delta_hash_map_(new AttributesHashMap()),
cumulative_hash_map_(new AttributesHashMap()),
temporal_metric_storage_(instrument_descriptor)
{}

virtual void RecordLong(
template <class T>
void Record(const std::unordered_map<MetricAttributes, T, AttributeHashGenerator> &measurements,
opentelemetry::common::SystemTimestamp observation_time) noexcept
{
// std::shared_ptr<AttributesHashMap> delta_hash_map(new AttributesHashMap());
// process the read measurements - aggregate and store in hashmap
for (auto &measurement : measurements)
{
auto aggr = DefaultAggregation::CreateAggregation(aggregation_type_, instrument_descriptor_);
aggr->Aggregate(measurement.second);
auto prev = cumulative_hash_map_->Get(measurement.first);
if (prev)
{
auto delta = prev->Diff(*aggr);
cumulative_hash_map_->Set(measurement.first,
DefaultAggregation::CloneAggregation(
aggregation_type_, instrument_descriptor_, *delta));
delta_hash_map_->Set(measurement.first, std::move(delta));
}
else
{
cumulative_hash_map_->Set(
measurement.first,
DefaultAggregation::CloneAggregation(aggregation_type_, instrument_descriptor_, *aggr));
delta_hash_map_->Set(measurement.first, std::move(aggr));
}
}
}

void RecordLong(
const std::unordered_map<MetricAttributes, long, AttributeHashGenerator> &measurements,
opentelemetry::common::SystemTimestamp observation_time) noexcept override
{
if (instrument_descriptor_.value_type_ != InstrumentValueType::kLong)
{
return;
}
Record<long>(measurements, observation_time);
}

virtual void RecordDouble(
void RecordDouble(
const std::unordered_map<MetricAttributes, double, AttributeHashGenerator> &measurements,
opentelemetry::common::SystemTimestamp observation_time) noexcept override
{
if (instrument_descriptor_.value_type_ != InstrumentValueType::kDouble)
{
return;
}
Record<double>(measurements, observation_time);
}

bool Collect(CollectorHandle *collector,
@@ -66,37 +94,9 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora
opentelemetry::common::SystemTimestamp collection_ts,
nostd::function_ref<bool(MetricData)> metric_collection_callback) noexcept override
{
nostd::shared_ptr<opentelemetry::sdk::metrics::ObserverResultT<T>> ob_res(
new opentelemetry::sdk::metrics::ObserverResultT<T>(nullptr));

// read the measurement using configured callback
// measurement_collection_callback_(ob_res, state_);
std::shared_ptr<AttributesHashMap> delta_hash_map(new AttributesHashMap());
// process the read measurements - aggregate and store in hashmap
for (auto &measurement : ob_res->GetMeasurements())
{
auto aggr = DefaultAggregation::CreateAggregation(aggregation_type_, instrument_descriptor_);
aggr->Aggregate(measurement.second);
auto prev = cumulative_hash_map_->Get(measurement.first);
if (prev)
{
auto delta = prev->Diff(*aggr);
cumulative_hash_map_->Set(measurement.first,
DefaultAggregation::CloneAggregation(
aggregation_type_, instrument_descriptor_, *delta));
delta_hash_map->Set(measurement.first, std::move(delta));
}
else
{
cumulative_hash_map_->Set(
measurement.first,
DefaultAggregation::CloneAggregation(aggregation_type_, instrument_descriptor_, *aggr));
delta_hash_map->Set(measurement.first, std::move(aggr));
}
}

return temporal_metric_storage_.buildMetrics(collector, collectors, sdk_start_ts, collection_ts,
std::move(delta_hash_map),
std::move(delta_hash_map_),
metric_collection_callback);
}

@@ -107,6 +107,7 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora
const AttributesProcessor *attributes_processor_;
void *state_;
std::unique_ptr<AttributesHashMap> cumulative_hash_map_;
std::unique_ptr<AttributesHashMap> delta_hash_map_;
TemporalMetricStorage temporal_metric_storage_;
};

2 changes: 1 addition & 1 deletion sdk/src/metrics/meter.cc
Original file line number Diff line number Diff line change
@@ -257,7 +257,7 @@ std::unique_ptr<AsyncWritableMetricStorage> Meter::RegisterAsyncMetricStorage(
{
view_instr_desc.description_ = view.GetDescription();
}
auto storage = std::shared_ptr<AsyncMetricStorage<long>>(new AsyncMetricStorage<long>(
auto storage = std::shared_ptr<AsyncMetricStorage>(new AsyncMetricStorage(
view_instr_desc, view.GetAggregationType(), &view.GetAttributesProcessor()));
storage_registry_[instrument_descriptor.name_] = storage;
static_cast<AsyncMultiMetricStorage *>(storages.get())->AddStorage(storage);
6 changes: 3 additions & 3 deletions sdk/test/metrics/async_metric_storage_test.cc
Original file line number Diff line number Diff line change
@@ -96,9 +96,9 @@ TEST_P(WritableMetricStorageTestFixture, TestAggregation)
long value = 0;

MeasurementFetcher measurement_fetcher;
opentelemetry::sdk::metrics::AsyncMetricStorage<long> storage(instr_desc, AggregationType::kSum,
// MeasurementFetcher::Fetcher,
new DefaultAttributesProcessor());
opentelemetry::sdk::metrics::AsyncMetricStorage storage(instr_desc, AggregationType::kSum,
// MeasurementFetcher::Fetcher,
new DefaultAttributesProcessor());

storage.Collect(collector.get(), collectors, sdk_start_ts, collection_ts,
[&](const MetricData data) {