Skip to content

Commit

Permalink
Convert Prometheus Exporter to Pull MetricReader (#1953)
Browse files Browse the repository at this point in the history
  • Loading branch information
lalitb authored Feb 5, 2023
1 parent 9200d0c commit c4c8ee0
Show file tree
Hide file tree
Showing 14 changed files with 198 additions and 760 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Increment the:

## [Unreleased]

* Convert Prometheus Exporter to Pull MetricReader [#1953](https://github.com/open-telemetry/opentelemetry-cpp/pull/1953)
* Upgrade prometheus-cpp to v1.1.0 [#1954](https://github.com/open-telemetry/opentelemetry-cpp/pull/1954)

## [1.8.2] 2023-01-31
Expand Down
4 changes: 3 additions & 1 deletion ci/do_ci.sh
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,9 @@ elif [[ "$1" == "bazel.asan" ]]; then
//examples/metrics_simple:metrics_ostream_example > /dev/null
exit 0
elif [[ "$1" == "bazel.tsan" ]]; then
bazel $BAZEL_STARTUP_OPTIONS test --config=tsan $BAZEL_TEST_OPTIONS_ASYNC //...
# TODO - potential race condition in Civetweb server used by prometheus-cpp during shutdown
# https://github.com/civetweb/civetweb/issues/861, so removing prometheus from the test
bazel $BAZEL_STARTUP_OPTIONS test --config=tsan $BAZEL_TEST_OPTIONS_ASYNC -- //... -//exporters/prometheus/...
bazel $BAZEL_STARTUP_OPTIONS run --config=tsan $BAZEL_TEST_OPTIONS_ASYNC \
//examples/metrics_simple:metrics_ostream_example > /dev/null
exit 0
Expand Down
13 changes: 4 additions & 9 deletions examples/prometheus/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,16 @@ void InitMetrics(const std::string &name, const std::string &addr)
}
std::puts("PrometheusExporter example program running ...");

std::unique_ptr<metrics_sdk::PushMetricExporter> exporter{
new metrics_exporter::PrometheusExporter(opts)};

std::string version{"1.2.0"};
std::string schema{"https://opentelemetry.io/schemas/1.2.0"};

std::shared_ptr<metrics_sdk::MetricReader> prometheus_exporter(
new metrics_exporter::PrometheusExporter(opts));

// Initialize and set the global MeterProvider
metrics_sdk::PeriodicExportingMetricReaderOptions options;
options.export_interval_millis = std::chrono::milliseconds(1000);
options.export_timeout_millis = std::chrono::milliseconds(500);
std::unique_ptr<metrics_sdk::MetricReader> reader{
new metrics_sdk::PeriodicExportingMetricReader(std::move(exporter), options)};
auto provider = std::shared_ptr<metrics_api::MeterProvider>(new metrics_sdk::MeterProvider());
auto p = std::static_pointer_cast<metrics_sdk::MeterProvider>(provider);
p->AddMetricReader(std::move(reader));
p->AddMetricReader(prometheus_exporter);

// counter view
std::string counter_name = name + "_counter";
Expand Down
2 changes: 2 additions & 0 deletions exporters/prometheus/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ cc_test(
deps = [
":prometheus_exporter",
":prometheus_test_helper",
"//sdk/src/metrics",
"@com_google_googletest//:gtest_main",
],
)
Expand All @@ -112,6 +113,7 @@ cc_test(
deps = [
":prometheus_collector",
":prometheus_test_helper",
"//sdk/src/metrics",
"@com_google_googletest//:gtest_main",
],
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <prometheus/collectable.h>
#include <prometheus/metric_family.h>
#include "opentelemetry/exporters/prometheus/exporter_utils.h"
#include "opentelemetry/sdk/metrics/metric_reader.h"

namespace prometheus_client = ::prometheus;

Expand All @@ -30,7 +31,7 @@ class PrometheusCollector : public prometheus_client::Collectable
* This constructor initializes the collection for metrics to export
* in this class with default capacity
*/
explicit PrometheusCollector(size_t max_collection_size = 2048);
explicit PrometheusCollector(sdk::metrics::MetricReader *reader);

/**
* Collects all metrics data from metricsToCollect collection.
Expand All @@ -39,41 +40,8 @@ class PrometheusCollector : public prometheus_client::Collectable
*/
std::vector<prometheus_client::MetricFamily> Collect() const override;

/**
* This function is called by export() function and add the collection of
* records to the metricsToCollect collection
*
* @param records a collection of records to add to the metricsToCollect collection
*/
void AddMetricData(const sdk::metrics::ResourceMetrics &data);

/**
* Get the current collection in the collector.
*
* @return the current metricsToCollect collection
*/
std::vector<std::unique_ptr<sdk::metrics::ResourceMetrics>> &GetCollection();

/**
* Gets the maximum size of the collection.
*
* @return max collection size
*/
int GetMaxCollectionSize() const;

private:
/**
* Collection of metrics data from the export() function, and to be export
* to user when they send a pull request. This collection is a pointer
* to a collection so Collect() is able to clear the collection, even
* though it is a const function.
*/
mutable std::vector<std::unique_ptr<sdk::metrics::ResourceMetrics>> metrics_to_collect_;

/**
* Maximum size of the metricsToCollect collection.
*/
size_t max_collection_size_;
sdk::metrics::MetricReader *reader_;

/*
* Lock when operating the metricsToCollect collection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
#include "opentelemetry/exporters/prometheus/collector.h"
#include "opentelemetry/nostd/span.h"
#include "opentelemetry/sdk/common/env_variables.h"
#include "opentelemetry/sdk/metrics/push_metric_exporter.h"
#include "opentelemetry/sdk/metrics/metric_reader.h"
#include "opentelemetry/version.h"

/**
Expand Down Expand Up @@ -46,7 +46,7 @@ struct PrometheusExporterOptions
std::string url = GetPrometheusDefaultHttpEndpoint();
};

class PrometheusExporter : public sdk::metrics::PushMetricExporter
class PrometheusExporter : public sdk::metrics::MetricReader
{
public:
/**
Expand All @@ -56,54 +56,12 @@ class PrometheusExporter : public sdk::metrics::PushMetricExporter
*/
PrometheusExporter(const PrometheusExporterOptions &options);

/**
* Get the AggregationTemporality for Prometheus exporter
*
* @return AggregationTemporality
*/
sdk::metrics::AggregationTemporality GetAggregationTemporality(
sdk::metrics::InstrumentType instrument_type) const noexcept override;

/**
* Exports a batch of Metric Records.
* @param records: a collection of records to export
* @return: returns a ReturnCode detailing a success, or type of failure
*/
sdk::common::ExportResult Export(const sdk::metrics::ResourceMetrics &data) noexcept override;

/**
* Force flush the exporter.
*/
bool ForceFlush(
std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept override;

/**
* Shuts down the exporter and does cleanup.
* Since Prometheus is a pull based interface,
* we cannot serve data remaining in the intermediate
* collection to to client an HTTP request being sent,
* so we flush the data.
*/
bool Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept override;

/**
* @return: returns a shared_ptr to
* the PrometheusCollector instance
*/
std::shared_ptr<PrometheusCollector> &GetCollector();

/**
* @return: Gets the shutdown status of the exporter
*/
bool IsShutdown() const;

private:
// The configuration options associated with this exporter.
const PrometheusExporterOptions options_;
/**
* exporter shutdown status
*/
bool is_shutdown_;

/**
* Pointer to a
Expand All @@ -117,16 +75,11 @@ class PrometheusExporter : public sdk::metrics::PushMetricExporter
*/
std::unique_ptr<::prometheus::Exposer> exposer_;

/**
* friend class for testing
*/
friend class PrometheusExporterTest;
bool OnForceFlush(std::chrono::microseconds timeout) noexcept override;

/**
* PrometheusExporter constructor with no parameters
* Used for testing only
*/
PrometheusExporter();
bool OnShutDown(std::chrono::microseconds timeout) noexcept override;

void OnInitialized() noexcept override;
};
} // namespace metrics
} // namespace exporter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class PrometheusExporterUtils
* @return a collection of translated metrics that is acceptable by Prometheus
*/
static std::vector<::prometheus::MetricFamily> TranslateToPrometheus(
const std::vector<std::unique_ptr<sdk::metrics::ResourceMetrics>> &data);
const sdk::metrics::ResourceMetrics &data);

private:
/**
Expand Down
60 changes: 10 additions & 50 deletions exporters/prometheus/src/collector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@ namespace metrics
* This constructor initializes the collection for metrics to export
* in this class with default capacity
*/
PrometheusCollector::PrometheusCollector(size_t max_collection_size)
: max_collection_size_(max_collection_size)
{}
PrometheusCollector::PrometheusCollector(sdk::metrics::MetricReader *reader) : reader_(reader) {}

/**
* Collects all metrics data from metricsToCollect collection.
Expand All @@ -27,59 +25,21 @@ PrometheusCollector::PrometheusCollector(size_t max_collection_size)
*/
std::vector<prometheus_client::MetricFamily> PrometheusCollector::Collect() const
{
this->collection_lock_.lock();
if (metrics_to_collect_.empty())
if (reader_->IsShutdown())
{
this->collection_lock_.unlock();
return {};
}
collection_lock_.lock();

std::vector<prometheus_client::MetricFamily> result;

// copy the intermediate collection, and then clear it
std::vector<std::unique_ptr<sdk::metrics::ResourceMetrics>> copied_data;
copied_data.swap(metrics_to_collect_);
this->collection_lock_.unlock();

result = PrometheusExporterUtils::TranslateToPrometheus(copied_data);
return result;
}

/**
* This function is called by export() function and add the collection of
* records to the metricsToCollect collection
*
* @param records a collection of records to add to the metricsToCollect collection
*/
void PrometheusCollector::AddMetricData(const sdk::metrics::ResourceMetrics &data)
{
collection_lock_.lock();
if (metrics_to_collect_.size() + 1 <= max_collection_size_)
{
// We can not use initializer lists here due to broken variadic capture on GCC 4.8.5
metrics_to_collect_.emplace_back(new sdk::metrics::ResourceMetrics(data));
}
reader_->Collect([&result](sdk::metrics::ResourceMetrics &metric_data) {
auto prometheus_metric_data = PrometheusExporterUtils::TranslateToPrometheus(metric_data);
for (auto &data : prometheus_metric_data)
result.emplace_back(data);
return true;
});
collection_lock_.unlock();
}

/**
* Get the current collection in the collector.
*
* @return the current metrics_to_collect collection
*/
std::vector<std::unique_ptr<sdk::metrics::ResourceMetrics>> &PrometheusCollector::GetCollection()
{
return metrics_to_collect_;
}

/**
* Gets the maximum size of the collection.
*
* @return max collection size
*/
int PrometheusCollector::GetMaxCollectionSize() const
{
return max_collection_size_;
return result;
}

} // namespace metrics
Expand Down
Loading

0 comments on commit c4c8ee0

Please sign in to comment.