diff --git a/CHANGELOG.md b/CHANGELOG.md index eb407ac96a..faa1c5b36a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,9 @@ Increment the: ## [Unreleased] +* [TEST] Add multi-threaded metrics benchmarks for shared vs per-thread counter + [#3865](https://github.com/open-telemetry/opentelemetry-cpp/pull/3865) + * [BUILD] Avoid break caused by max() macro on windows [#3863](https://github.com/open-telemetry/opentelemetry-cpp/pull/3863) diff --git a/sdk/test/metrics/measurements_benchmark.cc b/sdk/test/metrics/measurements_benchmark.cc index 015f82937d..4b41d490f4 100644 --- a/sdk/test/metrics/measurements_benchmark.cc +++ b/sdk/test/metrics/measurements_benchmark.cc @@ -2,10 +2,10 @@ // SPDX-License-Identifier: Apache-2.0 #include -#include #include #include #include +#include #include #include #include @@ -17,6 +17,7 @@ #include "opentelemetry/metrics/meter.h" #include "opentelemetry/metrics/sync_instruments.h" #include "opentelemetry/nostd/function_ref.h" +#include "opentelemetry/nostd/shared_ptr.h" #include "opentelemetry/nostd/string_view.h" #include "opentelemetry/nostd/unique_ptr.h" #include "opentelemetry/sdk/common/attribute_utils.h" @@ -52,6 +53,21 @@ class MockMetricExporter : public MetricReader namespace { + +size_t GetBenchmarkThreads() +{ + const char *env = std::getenv("BENCHMARK_THREADS"); + if (env != nullptr && env[0] != '\0') + { + int val = std::atoi(env); + if (val > 0) + { + return static_cast(val); + } + } + return 4; // default +} + void BM_MeasurementsTest(benchmark::State &state) { MeterProvider mp; @@ -97,5 +113,104 @@ void BM_MeasurementsTest(benchmark::State &state) } BENCHMARK(BM_MeasurementsTest); +void BM_MeasurementsThreadsShareCounterTest(benchmark::State &state) +{ + MeterProvider mp; + auto m = mp.GetMeter("meter1", "version1", "schema1"); + + std::shared_ptr exporter(new MockMetricExporter()); + mp.AddMetricReader(exporter); + auto h = m->CreateDoubleCounter("counter1", "counter1_description", "counter1_unit"); + size_t MAX_MEASUREMENTS = 10000; // keep low to prevent CI failure due to timeout + size_t NUM_CORES = GetBenchmarkThreads(); + std::vector threads; + std::map attributes[1000]; + size_t total_index = 0; + for (uint32_t i = 0; i < 10; i++) + { + for (uint32_t j = 0; j < 10; j++) + for (uint32_t k = 0; k < 10; k++) + attributes[total_index++] = {{"dim1", i}, {"dim2", j}, {"dim3", k}}; + } + while (state.KeepRunning()) + { + threads.clear(); + std::atomic cur_processed{0}; + for (size_t i = 0; i < NUM_CORES; i++) + { + threads.push_back(std::thread( + [&h, &cur_processed, &MAX_MEASUREMENTS, &attributes](size_t /*thread_id*/) { + while (cur_processed++ <= MAX_MEASUREMENTS) + { + size_t index = rand() % 1000; + h->Add(1.0, + opentelemetry::common::KeyValueIterableView>( + attributes[index]), + opentelemetry::context::Context{}); + } + }, + i)); + } + for (auto &thread : threads) + { + thread.join(); + } + } + exporter->Collect([&](ResourceMetrics & /*rm*/) { return true; }); +} +BENCHMARK(BM_MeasurementsThreadsShareCounterTest); + +void BM_MeasurementsPerThreadCounterTest(benchmark::State &state) +{ + MeterProvider mp; + auto m = mp.GetMeter("meter1", "version1", "schema1"); + + std::shared_ptr exporter(new MockMetricExporter()); + mp.AddMetricReader(exporter); + size_t MAX_MEASUREMENTS = 10000; // keep low to prevent CI failure due to timeout + size_t NUM_CORES = GetBenchmarkThreads(); + std::vector threads; + std::map attributes[1000]; + size_t total_index = 0; + for (uint32_t i = 0; i < 10; i++) + { + for (uint32_t j = 0; j < 10; j++) + for (uint32_t k = 0; k < 10; k++) + attributes[total_index++] = {{"dim1", i}, {"dim2", j}, {"dim3", k}}; + } + while (state.KeepRunning()) + { + threads.clear(); + std::atomic cur_processed{0}; + for (size_t i = 0; i < NUM_CORES; i++) + { + threads.push_back(std::thread( + [&m, &cur_processed, &MAX_MEASUREMENTS, &attributes](size_t thread_id) { + // Each thread creates its own counter with the same name but a unique description + // encoding the thread id, ensuring no shared underlying storage. + std::string description = "counter1_description_thread_" + std::to_string(thread_id); + auto per_thread_counter = + m->CreateDoubleCounter("counter1", description, "counter1_unit"); + while (cur_processed++ <= MAX_MEASUREMENTS) + { + size_t index = rand() % 1000; + per_thread_counter->Add( + 1.0, + opentelemetry::common::KeyValueIterableView>( + attributes[index]), + opentelemetry::context::Context{}); + } + }, + i)); + } + for (auto &thread : threads) + { + thread.join(); + } + } + exporter->Collect([&](ResourceMetrics & /*rm*/) { return true; }); +} +BENCHMARK(BM_MeasurementsPerThreadCounterTest); + } // namespace BENCHMARK_MAIN();