Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ Increment the:

## [Unreleased]

* [TEST] Add multi-threaded metrics benchmarks for shared vs per-thread counter
[#3865](https://github.com/open-telemetry/opentelemetry-cpp/pull/3865)

* [BUILD] Avoid break caused by max() macro on windows
[#3863](https://github.com/open-telemetry/opentelemetry-cpp/pull/3863)

Expand Down
117 changes: 116 additions & 1 deletion sdk/test/metrics/measurements_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
// SPDX-License-Identifier: Apache-2.0

#include <benchmark/benchmark.h>
#include <stdlib.h>
#include <atomic>
#include <chrono>
#include <cstdint>
#include <cstdlib>
#include <map>
#include <string>
#include <thread>
Expand All @@ -17,6 +17,7 @@
#include "opentelemetry/metrics/meter.h"
#include "opentelemetry/metrics/sync_instruments.h"
#include "opentelemetry/nostd/function_ref.h"
#include "opentelemetry/nostd/shared_ptr.h"
#include "opentelemetry/nostd/string_view.h"
#include "opentelemetry/nostd/unique_ptr.h"
#include "opentelemetry/sdk/common/attribute_utils.h"
Expand Down Expand Up @@ -52,6 +53,21 @@ class MockMetricExporter : public MetricReader

namespace
{

size_t GetBenchmarkThreads()
{
const char *env = std::getenv("BENCHMARK_THREADS");
if (env != nullptr && env[0] != '\0')
{
int val = std::atoi(env);
if (val > 0)
{
return static_cast<size_t>(val);
}
}
return 4; // default
}

void BM_MeasurementsTest(benchmark::State &state)
{
MeterProvider mp;
Expand Down Expand Up @@ -97,5 +113,104 @@ void BM_MeasurementsTest(benchmark::State &state)
}
BENCHMARK(BM_MeasurementsTest);

void BM_MeasurementsThreadsShareCounterTest(benchmark::State &state)
{
MeterProvider mp;
auto m = mp.GetMeter("meter1", "version1", "schema1");

std::shared_ptr<MetricReader> exporter(new MockMetricExporter());
mp.AddMetricReader(exporter);
auto h = m->CreateDoubleCounter("counter1", "counter1_description", "counter1_unit");
size_t MAX_MEASUREMENTS = 10000; // keep low to prevent CI failure due to timeout
size_t NUM_CORES = GetBenchmarkThreads();
std::vector<std::thread> threads;
std::map<std::string, uint32_t> attributes[1000];
size_t total_index = 0;
for (uint32_t i = 0; i < 10; i++)
{
for (uint32_t j = 0; j < 10; j++)
for (uint32_t k = 0; k < 10; k++)
attributes[total_index++] = {{"dim1", i}, {"dim2", j}, {"dim3", k}};
}
while (state.KeepRunning())
{
threads.clear();
std::atomic<size_t> cur_processed{0};
for (size_t i = 0; i < NUM_CORES; i++)
{
threads.push_back(std::thread(
[&h, &cur_processed, &MAX_MEASUREMENTS, &attributes](size_t /*thread_id*/) {
while (cur_processed++ <= MAX_MEASUREMENTS)
{
size_t index = rand() % 1000;
h->Add(1.0,
opentelemetry::common::KeyValueIterableView<std::map<std::string, uint32_t>>(
attributes[index]),
opentelemetry::context::Context{});
}
},
i));
}
for (auto &thread : threads)
{
thread.join();
}
}
exporter->Collect([&](ResourceMetrics & /*rm*/) { return true; });
}
BENCHMARK(BM_MeasurementsThreadsShareCounterTest);

void BM_MeasurementsPerThreadCounterTest(benchmark::State &state)
{
MeterProvider mp;
auto m = mp.GetMeter("meter1", "version1", "schema1");

std::shared_ptr<MetricReader> exporter(new MockMetricExporter());
mp.AddMetricReader(exporter);
size_t MAX_MEASUREMENTS = 10000; // keep low to prevent CI failure due to timeout
size_t NUM_CORES = GetBenchmarkThreads();
std::vector<std::thread> threads;
std::map<std::string, uint32_t> attributes[1000];
size_t total_index = 0;
for (uint32_t i = 0; i < 10; i++)
{
for (uint32_t j = 0; j < 10; j++)
for (uint32_t k = 0; k < 10; k++)
attributes[total_index++] = {{"dim1", i}, {"dim2", j}, {"dim3", k}};
}
while (state.KeepRunning())
{
threads.clear();
std::atomic<size_t> cur_processed{0};
for (size_t i = 0; i < NUM_CORES; i++)
{
threads.push_back(std::thread(
[&m, &cur_processed, &MAX_MEASUREMENTS, &attributes](size_t thread_id) {
// Each thread creates its own counter with the same name but a unique description
// encoding the thread id, ensuring no shared underlying storage.
std::string description = "counter1_description_thread_" + std::to_string(thread_id);
auto per_thread_counter =
m->CreateDoubleCounter("counter1", description, "counter1_unit");
while (cur_processed++ <= MAX_MEASUREMENTS)
{
size_t index = rand() % 1000;
per_thread_counter->Add(
1.0,
opentelemetry::common::KeyValueIterableView<std::map<std::string, uint32_t>>(
attributes[index]),
opentelemetry::context::Context{});
}
},
i));
}
for (auto &thread : threads)
{
thread.join();
}
}
exporter->Collect([&](ResourceMetrics & /*rm*/) { return true; });
}
BENCHMARK(BM_MeasurementsPerThreadCounterTest);

} // namespace
BENCHMARK_MAIN();