Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multiple async callbacks #1495

Merged
merged 32 commits into from
Aug 5, 2022
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix
lalitb committed Aug 2, 2022

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
commit 152a3a3d1280e4b281a15d3d10de75ea405835ea
1 change: 0 additions & 1 deletion api/include/opentelemetry/metrics/async_instruments.h
Original file line number Diff line number Diff line change
@@ -10,7 +10,6 @@ OPENTELEMETRY_BEGIN_NAMESPACE
namespace metrics
{

// typedef void (*ObservableCallbackPtr)(ObserverResult &, void *);
using ObservableCallbackPtr = void (*)(ObserverResult, void *);

class ObservableInstrument
3 changes: 2 additions & 1 deletion examples/common/metrics_foo_library/foo_library.cc
Original file line number Diff line number Diff line change
@@ -32,7 +32,7 @@ std::map<std::string, std::string> get_random_attr()
class MeasurementFetcher
{
public:
static void Fetcher(opentelemetry::metrics::ObserverResult &observer_result, void *state)
static void Fetcher(opentelemetry::metrics::ObserverResult observer_result, void *state)
{
std::map<std::string, std::string> labels = get_random_attr();
auto labelkv = opentelemetry::common::KeyValueIterableView<decltype(labels)>{labels};
@@ -69,6 +69,7 @@ void foo_library::observable_counter_example(const std::string &name)
auto provider = metrics_api::Provider::GetMeterProvider();
nostd::shared_ptr<metrics_api::Meter> meter = provider->GetMeter(name, "1.2.0");
auto counter = meter->CreateDoubleObservableCounter(counter_name);
counter->AddCallback(MeasurementFetcher::Fetcher, nullptr);
while (true)
{
std::this_thread::sleep_for(std::chrono::milliseconds(500));
Original file line number Diff line number Diff line change
@@ -34,16 +34,15 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora
aggregation_type_{aggregation_type},
attributes_processor_{attributes_processor},
state_{state},
delta_hash_map_(new AttributesHashMap()),
cumulative_hash_map_(new AttributesHashMap()),
delta_hash_map_(new AttributesHashMap()),
temporal_metric_storage_(instrument_descriptor, aggregation_config)
{}

template <class T>
void Record(const std::unordered_map<MetricAttributes, T, AttributeHashGenerator> &measurements,
opentelemetry::common::SystemTimestamp observation_time) noexcept
{
// std::shared_ptr<AttributesHashMap> delta_hash_map(new AttributesHashMap());
// process the read measurements - aggregate and store in hashmap
for (auto &measurement : measurements)
{
@@ -105,7 +104,6 @@ class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStora
private:
InstrumentDescriptor instrument_descriptor_;
AggregationType aggregation_type_;
void (*measurement_collection_callback_)(opentelemetry::metrics::ObserverResult &, void *);
const AttributesProcessor *attributes_processor_;
void *state_;
std::unique_ptr<AttributesHashMap> cumulative_hash_map_;
6 changes: 6 additions & 0 deletions sdk/src/metrics/meter.cc
Original file line number Diff line number Diff line change
@@ -261,6 +261,12 @@ std::unique_ptr<AsyncWritableMetricStorage> Meter::RegisterAsyncMetricStorage(
static_cast<AsyncMultiMetricStorage *>(storages.get())->AddStorage(storage);
return true;
});
if (!success)
{
OTEL_INTERNAL_LOG_ERROR(
"[Meter::RegisterAsyncMetricStorage] - Error during finding matching views."
<< "Some of the matching view configurations mayn't be used for metric collection");
}
return storages;
}

6 changes: 3 additions & 3 deletions sdk/src/metrics/state/observable_registry.cc
Original file line number Diff line number Diff line change
@@ -21,27 +21,27 @@ void ObservableRegistry::AddCallback(opentelemetry::metrics::ObservableCallbackP
// TBD - Check if existing
std::unique_ptr<ObservableCallbackRecord> record(
new ObservableCallbackRecord{callback, state, instrument});
std::unique_lock<std::mutex> lk(callbacks_m_);
std::lock_guard<std::mutex> lock_guard{callbacks_m_};
callbacks_.push_back(std::move(record));
}

void ObservableRegistry::RemoveCallback(opentelemetry::metrics::ObservableCallbackPtr callback,
void *state,
opentelemetry::metrics::ObservableInstrument *instrument)
{
std::lock_guard<std::mutex> lock_guard{callbacks_m_};
auto new_end = std::remove_if(
callbacks_.begin(), callbacks_.end(),
[callback, state, instrument](const std::unique_ptr<ObservableCallbackRecord> &record) {
return record->callback == callback && record->state == state &&
record->instrument == instrument;
});
std::unique_lock<std::mutex> lk(callbacks_m_);
callbacks_.erase(new_end, callbacks_.end());
}

void ObservableRegistry::Observe(opentelemetry::common::SystemTimestamp collection_ts)
{
std::unique_lock<std::mutex> lk(callbacks_m_);
std::lock_guard<std::mutex> lock_guard{callbacks_m_};
for (auto &callback_wrap : callbacks_)
{
auto value_type =
1 change: 0 additions & 1 deletion sdk/test/metrics/async_instruments_test.cc
Original file line number Diff line number Diff line change
@@ -20,7 +20,6 @@ TEST(AsyncInstruments, ObservableInstrument)
InstrumentType::kObservableCounter,
InstrumentValueType::kLong};
std::unique_ptr<AsyncWritableMetricStorage> metric_storage(new AsyncMultiMetricStorage());
auto asyc_generate_meas_long = [](opentelemetry::metrics::ObserverResultT<long> &observer) {};
ObservableInstrument observable_counter_long(instrument_descriptor, std::move(metric_storage));
observable_counter_long.AddCallback(asyc_generate_measurements, nullptr);
}