Skip to content

Commit

Permalink
Merge branch 'main' into metrics-race-condition
Browse files Browse the repository at this point in the history
  • Loading branch information
ThomsonTan authored Aug 11, 2022
2 parents 6cacba1 + 4cf41c4 commit 88a314b
Show file tree
Hide file tree
Showing 8 changed files with 171 additions and 24 deletions.
4 changes: 2 additions & 2 deletions sdk/include/opentelemetry/sdk/metrics/meter.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class Meter final : public opentelemetry::metrics::Meter
public:
/** Construct a new Meter with the given pipeline. */
explicit Meter(
std::shared_ptr<sdk::metrics::MeterContext> meter_context,
std::weak_ptr<sdk::metrics::MeterContext> meter_context,
std::unique_ptr<opentelemetry::sdk::instrumentationscope::InstrumentationScope> scope =
opentelemetry::sdk::instrumentationscope::InstrumentationScope::Create("")) noexcept;

Expand Down Expand Up @@ -111,7 +111,7 @@ class Meter final : public opentelemetry::metrics::Meter
// order of declaration is important here - instrumentation scope should destroy after
// meter-context.
std::unique_ptr<sdk::instrumentationscope::InstrumentationScope> scope_;
std::shared_ptr<sdk::metrics::MeterContext> meter_context_;
std::weak_ptr<sdk::metrics::MeterContext> meter_context_;
// Mapping between instrument-name and Aggregation Storage.
std::unordered_map<std::string, std::shared_ptr<MetricStorage>> storage_registry_;
std::unique_ptr<SyncWritableMetricStorage> RegisterSyncMetricStorage(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ class CollectorHandle
class MetricCollector : public MetricProducer, public CollectorHandle
{
public:
MetricCollector(std::shared_ptr<MeterContext> &&context,
std::unique_ptr<MetricReader> metric_reader);
MetricCollector(MeterContext *context, std::unique_ptr<MetricReader> metric_reader);

AggregationTemporality GetAggregationTemporality(
InstrumentType instrument_type) noexcept override;
Expand All @@ -50,7 +49,7 @@ class MetricCollector : public MetricProducer, public CollectorHandle
bool Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept;

private:
std::shared_ptr<MeterContext> meter_context_;
MeterContext *meter_context_;
std::shared_ptr<MetricReader> metric_reader_;
};
} // namespace metrics
Expand Down
33 changes: 27 additions & 6 deletions sdk/src/metrics/meter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ namespace metrics = opentelemetry::metrics;
namespace nostd = opentelemetry::nostd;

Meter::Meter(
std::shared_ptr<MeterContext> meter_context,
std::weak_ptr<MeterContext> meter_context,
std::unique_ptr<sdk::instrumentationscope::InstrumentationScope> instrumentation_scope) noexcept
: scope_{std::move(instrumentation_scope)}, meter_context_{meter_context}
{}
Expand Down Expand Up @@ -204,7 +204,14 @@ const sdk::instrumentationscope::InstrumentationScope *Meter::GetInstrumentation
std::unique_ptr<SyncWritableMetricStorage> Meter::RegisterSyncMetricStorage(
InstrumentDescriptor &instrument_descriptor)
{
auto view_registry = meter_context_->GetViewRegistry();
auto ctx = meter_context_.lock();
if (!ctx)
{
OTEL_INTERNAL_LOG_ERROR("[Meter::RegisterMetricStorage] - Error during finding matching views."
<< "The metric context is invalid");
return nullptr;
}
auto view_registry = ctx->GetViewRegistry();
std::unique_ptr<SyncWritableMetricStorage> storages(new SyncMultiMetricStorage());

auto success = view_registry->FindViews(
Expand Down Expand Up @@ -240,7 +247,15 @@ std::unique_ptr<SyncWritableMetricStorage> Meter::RegisterSyncMetricStorage(
std::unique_ptr<AsyncWritableMetricStorage> Meter::RegisterAsyncMetricStorage(
InstrumentDescriptor &instrument_descriptor)
{
auto view_registry = meter_context_->GetViewRegistry();
auto ctx = meter_context_.lock();
if (!ctx)
{
OTEL_INTERNAL_LOG_ERROR(
"[Meter::RegisterAsyncMetricStorage] - Error during finding matching views."
<< "The metric context is invalid");
return nullptr;
}
auto view_registry = ctx->GetViewRegistry();
std::unique_ptr<AsyncWritableMetricStorage> storages(new AsyncMultiMetricStorage());
auto success = view_registry->FindViews(
instrument_descriptor, *GetInstrumentationScope(),
Expand Down Expand Up @@ -276,11 +291,17 @@ std::vector<MetricData> Meter::Collect(CollectorHandle *collector,
{

std::vector<MetricData> metric_data_list;
auto ctx = meter_context_.lock();
if (!ctx)
{
OTEL_INTERNAL_LOG_ERROR("[Meter::Collect] - Error during collection."
<< "The metric context is invalid");
return std::vector<MetricData>{};
}
for (auto &metric_storage : storage_registry_)
{
metric_storage.second->Collect(collector, meter_context_->GetCollectors(),
meter_context_->GetSDKStartTime(), collect_ts,
[&metric_data_list](MetricData metric_data) {
metric_storage.second->Collect(collector, ctx->GetCollectors(), ctx->GetSDKStartTime(),
collect_ts, [&metric_data_list](MetricData metric_data) {
metric_data_list.push_back(metric_data);
return true;
});
Expand Down
3 changes: 1 addition & 2 deletions sdk/src/metrics/meter_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ opentelemetry::common::SystemTimestamp MeterContext::GetSDKStartTime() noexcept

void MeterContext::AddMetricReader(std::unique_ptr<MetricReader> reader) noexcept
{
auto collector =
std::shared_ptr<MetricCollector>{new MetricCollector(shared_from_this(), std::move(reader))};
auto collector = std::shared_ptr<MetricCollector>{new MetricCollector(this, std::move(reader))};
collectors_.push_back(collector);
}

Expand Down
13 changes: 9 additions & 4 deletions sdk/src/metrics/state/metric_collector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@ namespace sdk
namespace metrics
{

MetricCollector::MetricCollector(
std::shared_ptr<opentelemetry::sdk::metrics::MeterContext> &&context,
std::unique_ptr<MetricReader> metric_reader)
: meter_context_{std::move(context)}, metric_reader_{std::move(metric_reader)}
MetricCollector::MetricCollector(opentelemetry::sdk::metrics::MeterContext *context,
std::unique_ptr<MetricReader> metric_reader)
: meter_context_{context}, metric_reader_{std::move(metric_reader)}
{
metric_reader_->SetMetricProducer(this);
}
Expand All @@ -33,6 +32,12 @@ AggregationTemporality MetricCollector::GetAggregationTemporality(
bool MetricCollector::Collect(
nostd::function_ref<bool(ResourceMetrics &metric_data)> callback) noexcept
{
if (!meter_context_)
{
OTEL_INTERNAL_LOG_ERROR("[MetricCollector::Collect] - Error during collecting."
<< "The metric context is invalid");
return false;
}
ResourceMetrics resource_metrics;
for (auto &meter : meter_context_->GetMeters())
{
Expand Down
7 changes: 7 additions & 0 deletions sdk/src/metrics/state/observable_registry.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
# include "opentelemetry/sdk/metrics/async_instruments.h"
# include "opentelemetry/sdk/metrics/observer_result.h"
# include "opentelemetry/sdk/metrics/state/metric_storage.h"
# include "opentelemetry/sdk_config.h"

OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
Expand Down Expand Up @@ -51,6 +52,12 @@ void ObservableRegistry::Observe(opentelemetry::common::SystemTimestamp collecti
auto storage =
static_cast<opentelemetry::sdk::metrics::ObservableInstrument *>(callback_wrap->instrument)
->GetMetricStorage();
if (!storage)
{
OTEL_INTERNAL_LOG_ERROR("[ObservableRegistry::Observe] - Error during observe."
<< "The metric storage is invalid");
return;
}
if (value_type == InstrumentValueType::kDouble)
{
nostd::shared_ptr<opentelemetry::metrics::ObserverResultT<double>> ob_res(
Expand Down
Loading

0 comments on commit 88a314b

Please sign in to comment.