From 47b8d4e897cd6e7da5067462924706ec7802ee4c Mon Sep 17 00:00:00 2001 From: PengFei Li Date: Fri, 15 Mar 2024 19:35:32 +0800 Subject: [PATCH] [Enhancement] Add thread pool metrics (backport #40171) (backport #42351) (#42683) Signed-off-by: PengFei Li --- be/src/agent/agent_server.cpp | 3 +- .../persistent_index_compaction_manager.cpp | 3 +- be/src/storage/storage_engine.cpp | 19 +-- be/src/storage/update_manager.cpp | 3 +- be/src/util/starrocks_metrics.h | 35 +++-- be/src/util/threadpool.cpp | 7 + be/src/util/threadpool.h | 16 +++ be/test/CMakeLists.txt | 1 + be/test/util/core_local_counter_test.cpp | 132 ++++++++++++++++++ be/test/util/starrocks_metrics_test.cpp | 14 ++ 10 files changed, 206 insertions(+), 27 deletions(-) create mode 100644 be/test/util/core_local_counter_test.cpp diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp index 7037554e343ae..51b6955bc1f45 100644 --- a/be/src/agent/agent_server.cpp +++ b/be/src/agent/agent_server.cpp @@ -172,8 +172,7 @@ void AgentServer::Impl::init_or_die() { BUILD_DYNAMIC_TASK_THREAD_POOL("publish_version", MIN_TRANSACTION_PUBLISH_WORKER_COUNT, max_publish_version_worker_count, DEFAULT_DYNAMIC_THREAD_POOL_QUEUE_SIZE, _thread_pool_publish_version); - REGISTER_GAUGE_STARROCKS_METRIC(publish_version_queue_count, - [this]() { return _thread_pool_publish_version->num_queued_tasks(); }); + REGISTER_THREAD_POOL_METRICS(publish_version, _thread_pool_publish_version); #endif BUILD_DYNAMIC_TASK_THREAD_POOL("drop", 1, config::drop_tablet_worker_count, std::numeric_limits::max(), diff --git a/be/src/storage/persistent_index_compaction_manager.cpp b/be/src/storage/persistent_index_compaction_manager.cpp index 795213e953942..23c327549f211 100644 --- a/be/src/storage/persistent_index_compaction_manager.cpp +++ b/be/src/storage/persistent_index_compaction_manager.cpp @@ -37,8 +37,7 @@ Status PersistentIndexCompactionManager::init() { .set_min_threads(1) .set_max_threads(max_pk_index_compaction_thread_cnt) .build(&_worker_thread_pool)); - REGISTER_GAUGE_STARROCKS_METRIC(pk_index_compaction_queue_count, - [this]() { return _worker_thread_pool->num_queued_tasks(); }); + REGISTER_THREAD_POOL_METRICS(pk_index_compaction, _worker_thread_pool); return Status::OK(); } diff --git a/be/src/storage/storage_engine.cpp b/be/src/storage/storage_engine.cpp index 8c9450103ecf0..db0ffff7ea373 100644 --- a/be/src/storage/storage_engine.cpp +++ b/be/src/storage/storage_engine.cpp @@ -217,28 +217,21 @@ Status StorageEngine::_open(const EngineOptions& options) { _async_delta_writer_executor = std::make_unique(thread_pool.release(), kTakesOwnership); - REGISTER_GAUGE_STARROCKS_METRIC(async_delta_writer_queue_count, [this]() { - return static_cast(_async_delta_writer_executor.get()) - ->get_thread_pool() - ->num_queued_tasks(); - }) + REGISTER_THREAD_POOL_METRICS( + async_delta_writer, + static_cast(_async_delta_writer_executor.get())->get_thread_pool()); _memtable_flush_executor = std::make_unique(); RETURN_IF_ERROR_WITH_WARN(_memtable_flush_executor->init(dirs), "init MemTableFlushExecutor failed"); - REGISTER_GAUGE_STARROCKS_METRIC(memtable_flush_queue_count, [this]() { - return _memtable_flush_executor->get_thread_pool()->num_queued_tasks(); - }) + REGISTER_THREAD_POOL_METRICS(memtable_flush, _memtable_flush_executor->get_thread_pool()); _segment_flush_executor = std::make_unique(); RETURN_IF_ERROR_WITH_WARN(_segment_flush_executor->init(dirs), "init SegmentFlushExecutor failed"); - REGISTER_GAUGE_STARROCKS_METRIC(segment_flush_queue_count, - [this]() { return _segment_flush_executor->get_thread_pool()->num_queued_tasks(); }) + REGISTER_THREAD_POOL_METRICS(segment_flush, _segment_flush_executor->get_thread_pool()); _segment_replicate_executor = std::make_unique(); RETURN_IF_ERROR_WITH_WARN(_segment_replicate_executor->init(dirs), "init SegmentReplicateExecutor failed"); - REGISTER_GAUGE_STARROCKS_METRIC(segment_replicate_queue_count, [this]() { - return _segment_replicate_executor->get_thread_pool()->num_queued_tasks(); - }) + REGISTER_THREAD_POOL_METRICS(segment_replicate, _segment_replicate_executor->get_thread_pool()); RETURN_IF_ERROR_WITH_WARN(_replication_txn_manager->init(dirs), "init ReplicationTxnManager failed"); diff --git a/be/src/storage/update_manager.cpp b/be/src/storage/update_manager.cpp index b34b29b816a09..d92f4592facc6 100644 --- a/be/src/storage/update_manager.cpp +++ b/be/src/storage/update_manager.cpp @@ -97,8 +97,7 @@ Status UpdateManager::init() { max_thread_cnt = config::transaction_apply_worker_count; } RETURN_IF_ERROR(ThreadPoolBuilder("update_apply").set_max_threads(max_thread_cnt).build(&_apply_thread_pool)); - REGISTER_GAUGE_STARROCKS_METRIC(update_apply_queue_count, - [this]() { return _apply_thread_pool->num_queued_tasks(); }); + REGISTER_THREAD_POOL_METRICS(update_apply, _apply_thread_pool); int max_get_thread_cnt = config::get_pindex_worker_count > max_thread_cnt ? config::get_pindex_worker_count : max_thread_cnt * 2; diff --git a/be/src/util/starrocks_metrics.h b/be/src/util/starrocks_metrics.h index bae7475109751..ef928499f1078 100644 --- a/be/src/util/starrocks_metrics.h +++ b/be/src/util/starrocks_metrics.h @@ -67,6 +67,25 @@ class IntGaugeMetricsMap { StarRocksMetrics::instance()->metrics()->register_hook( \ #name, [&]() { StarRocksMetrics::instance()->name.set_value(func()); }); +#define METRICS_DEFINE_THREAD_POOL(threadpool_name) \ + METRIC_DEFINE_UINT_GAUGE(threadpool_name##_threadpool_size, MetricUnit::NOUNIT); \ + METRIC_DEFINE_UINT_GAUGE(threadpool_name##_executed_tasks_total, MetricUnit::NOUNIT); \ + METRIC_DEFINE_UINT_GAUGE(threadpool_name##_pending_time_ns_total, MetricUnit::NANOSECONDS); \ + METRIC_DEFINE_UINT_GAUGE(threadpool_name##_execute_time_ns_total, MetricUnit::NANOSECONDS); \ + METRIC_DEFINE_UINT_GAUGE(threadpool_name##_queue_count, MetricUnit::NOUNIT) + +#define REGISTER_THREAD_POOL_METRICS(name, threadpool) \ + do { \ + REGISTER_GAUGE_STARROCKS_METRIC(name##_threadpool_size, [this]() { return threadpool->max_threads(); }) \ + REGISTER_GAUGE_STARROCKS_METRIC(name##_executed_tasks_total, \ + [this]() { return threadpool->total_executed_tasks(); }) \ + REGISTER_GAUGE_STARROCKS_METRIC(name##_pending_time_ns_total, \ + [this]() { return threadpool->total_pending_time_ns(); }) \ + REGISTER_GAUGE_STARROCKS_METRIC(name##_execute_time_ns_total, \ + [this]() { return threadpool->total_execute_time_ns(); }) \ + REGISTER_GAUGE_STARROCKS_METRIC(name##_queue_count, [this]() { return threadpool->num_queued_tasks(); }) \ + } while (false) + class StarRocksMetrics { public: // query execution @@ -254,14 +273,14 @@ class StarRocksMetrics { METRIC_DEFINE_UINT_GAUGE(brpc_endpoint_stub_count, MetricUnit::NOUNIT); METRIC_DEFINE_UINT_GAUGE(tablet_writer_count, MetricUnit::NOUNIT); - // queue task count of thread pool - METRIC_DEFINE_UINT_GAUGE(publish_version_queue_count, MetricUnit::NOUNIT); - METRIC_DEFINE_UINT_GAUGE(async_delta_writer_queue_count, MetricUnit::NOUNIT); - METRIC_DEFINE_UINT_GAUGE(memtable_flush_queue_count, MetricUnit::NOUNIT); - METRIC_DEFINE_UINT_GAUGE(segment_replicate_queue_count, MetricUnit::NOUNIT); - METRIC_DEFINE_UINT_GAUGE(segment_flush_queue_count, MetricUnit::NOUNIT); - METRIC_DEFINE_UINT_GAUGE(update_apply_queue_count, MetricUnit::NOUNIT); - METRIC_DEFINE_UINT_GAUGE(pk_index_compaction_queue_count, MetricUnit::NOUNIT); + // thread pool metrics + METRICS_DEFINE_THREAD_POOL(publish_version); + METRICS_DEFINE_THREAD_POOL(async_delta_writer); + METRICS_DEFINE_THREAD_POOL(memtable_flush); + METRICS_DEFINE_THREAD_POOL(segment_replicate); + METRICS_DEFINE_THREAD_POOL(segment_flush); + METRICS_DEFINE_THREAD_POOL(update_apply); + METRICS_DEFINE_THREAD_POOL(pk_index_compaction); METRIC_DEFINE_UINT_GAUGE(load_rpc_threadpool_size, MetricUnit::NOUNIT); diff --git a/be/src/util/threadpool.cpp b/be/src/util/threadpool.cpp index a24bad9ad6d19..ee1102d46e744 100644 --- a/be/src/util/threadpool.cpp +++ b/be/src/util/threadpool.cpp @@ -558,6 +558,7 @@ void ThreadPool::dispatch_thread() { l.unlock(); + MonoTime start_time = MonoTime::Now(); // Execute the task task.runnable->run(); current_thread->inc_finished_tasks(); @@ -569,6 +570,12 @@ void ThreadPool::dispatch_thread() { // In the worst case, the destructor might even try to do something // with this threadpool, and produce a deadlock. task.runnable.reset(); + MonoTime finish_time = MonoTime::Now(); + + _total_executed_tasks.increment(1); + _total_pending_time_ns.increment(start_time.GetDeltaSince(task.submit_time).ToNanoseconds()); + _total_execute_time_ns.increment(finish_time.GetDeltaSince(start_time).ToNanoseconds()); + l.lock(); _last_active_timestamp = MonoTime::Now(); diff --git a/be/src/util/threadpool.h b/be/src/util/threadpool.h index d1119f6bc7bec..4c7e94947a165 100644 --- a/be/src/util/threadpool.h +++ b/be/src/util/threadpool.h @@ -53,6 +53,7 @@ #include "util/bthreads/semaphore.h" // resolve `barrier` macro conflicts with boost/thread.hpp header file #undef barrier +#include "util/metrics.h" #include "util/monotime.h" #include "util/priority_queue.h" @@ -248,6 +249,12 @@ class ThreadPool { int max_threads() const { return _max_threads.load(std::memory_order_acquire); } + int64_t total_executed_tasks() const { return _total_executed_tasks.value(); } + + int64_t total_pending_time_ns() const { return _total_pending_time_ns.value(); } + + int64_t total_execute_time_ns() const { return _total_execute_time_ns.value(); } + private: friend class ThreadPoolBuilder; friend class ThreadPoolToken; @@ -372,6 +379,15 @@ class ThreadPool { // ExecutionMode::CONCURRENT token used by the pool for tokenless submission. std::unique_ptr _tokenless; + // Total number of tasks that have finished + CoreLocalCounter _total_executed_tasks{MetricUnit::NOUNIT}; + + // Total time in nanoseconds that tasks pending in the queue. + CoreLocalCounter _total_pending_time_ns{MetricUnit::NOUNIT}; + + // Total time in nanoseconds to execute tasks. + CoreLocalCounter _total_execute_time_ns{MetricUnit::NOUNIT}; + ThreadPool(const ThreadPool&) = delete; const ThreadPool& operator=(const ThreadPool&) = delete; }; diff --git a/be/test/CMakeLists.txt b/be/test/CMakeLists.txt index fec6381acf0d4..dbb9469a1d997 100644 --- a/be/test/CMakeLists.txt +++ b/be/test/CMakeLists.txt @@ -364,6 +364,7 @@ set(EXEC_FILES ./util/cidr_test.cpp ./util/coding_test.cpp ./util/core_local_test.cpp + ./util/core_local_counter_test.cpp ./util/countdown_latch_test.cpp ./util/crc32c_test.cpp ./util/dynamic_cache_test.cpp diff --git a/be/test/util/core_local_counter_test.cpp b/be/test/util/core_local_counter_test.cpp new file mode 100644 index 0000000000000..dfb682af5d75e --- /dev/null +++ b/be/test/util/core_local_counter_test.cpp @@ -0,0 +1,132 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include + +#include "util/metrics.h" + +namespace starrocks { + +const size_t OPS_PER_THREAD = 500000; + +static void* thread_counter(void* arg) { + bvar::Adder* reducer = (bvar::Adder*)arg; + butil::Timer timer; + timer.start(); + for (size_t i = 0; i < OPS_PER_THREAD; ++i) { + (*reducer) << 2; + } + timer.stop(); + return (void*)(timer.n_elapsed()); +} + +void* add_atomic(void* arg) { + butil::atomic* counter = (butil::atomic*)arg; + butil::Timer timer; + timer.start(); + for (size_t i = 0; i < OPS_PER_THREAD / 100; ++i) { + counter->fetch_add(2, butil::memory_order_relaxed); + } + timer.stop(); + return (void*)(timer.n_elapsed()); +} + +static long start_perf_test_with_atomic(size_t num_thread) { + butil::atomic counter(0); + pthread_t threads[num_thread]; + for (size_t i = 0; i < num_thread; ++i) { + pthread_create(&threads[i], nullptr, &add_atomic, (void*)&counter); + } + long totol_time = 0; + for (size_t i = 0; i < num_thread; ++i) { + void* ret; + pthread_join(threads[i], &ret); + totol_time += (long)ret; + } + long avg_time = totol_time / (OPS_PER_THREAD / 100 * num_thread); + EXPECT_EQ(2ul * num_thread * OPS_PER_THREAD / 100, counter.load()); + return avg_time; +} + +static long start_perf_test_with_adder(size_t num_thread) { + bvar::Adder reducer; + EXPECT_TRUE(reducer.valid()); + pthread_t threads[num_thread]; + for (size_t i = 0; i < num_thread; ++i) { + pthread_create(&threads[i], nullptr, &thread_counter, (void*)&reducer); + } + long totol_time = 0; + for (size_t i = 0; i < num_thread; ++i) { + void* ret = nullptr; + pthread_join(threads[i], &ret); + totol_time += (long)ret; + } + long avg_time = totol_time / (OPS_PER_THREAD * num_thread); + EXPECT_EQ(2ul * num_thread * OPS_PER_THREAD, reducer.get_value()); + return avg_time; +} + +static void* core_local_counter(void* arg) { + CoreLocalCounter* counter = (CoreLocalCounter*)arg; + butil::Timer timer; + timer.start(); + for (size_t i = 0; i < OPS_PER_THREAD; ++i) { + counter->increment(2); + } + timer.stop(); + return (void*)(timer.n_elapsed()); +} + +static long start_perf_test_with_core_local(size_t num_thread) { + CoreLocalCounter counter(MetricUnit::NOUNIT); + pthread_t threads[num_thread]; + for (size_t i = 0; i < num_thread; ++i) { + pthread_create(&threads[i], nullptr, &core_local_counter, (void*)&counter); + } + long totol_time = 0; + for (size_t i = 0; i < num_thread; ++i) { + void* ret = nullptr; + pthread_join(threads[i], &ret); + totol_time += (long)ret; + } + long avg_time = totol_time / (OPS_PER_THREAD * num_thread); + EXPECT_EQ(2ul * num_thread * OPS_PER_THREAD, counter.value()); + return avg_time; +} + +// Compare the performance among bvar, atomic and CoreLocalCounter. +// You should build the test with BUILD_TYPE=release. The way to test +// is same as that of brpc https://github.com/apache/brpc/blob/1.3.0/test/bvar_reducer_unittest.cpp#L124 +TEST(CoreLocalCounterTest, DISABLED_test_perf) { + std::ostringstream oss; + for (size_t i = 1; i <= 24; ++i) { + oss << i << '\t' << start_perf_test_with_adder(i) << '\n'; + } + LOG(INFO) << "Adder performance:\n" << oss.str(); + oss.str(""); + for (size_t i = 1; i <= 24; ++i) { + oss << i << '\t' << start_perf_test_with_core_local(i) << '\n'; + } + LOG(INFO) << "CoreLocal performance:\n" << oss.str(); + oss.str(""); + for (size_t i = 1; i <= 24; ++i) { + oss << i << '\t' << start_perf_test_with_atomic(i) << '\n'; + } + LOG(INFO) << "Atomic performance:\n" << oss.str(); +} + +} // namespace starrocks diff --git a/be/test/util/starrocks_metrics_test.cpp b/be/test/util/starrocks_metrics_test.cpp index b04be50667278..47a001938b6a1 100644 --- a/be/test/util/starrocks_metrics_test.cpp +++ b/be/test/util/starrocks_metrics_test.cpp @@ -302,6 +302,14 @@ TEST_F(StarRocksMetricsTest, PageCacheMetrics) { ASSERT_STREQ(std::to_string(cache->get_capacity()).c_str(), capacity_metric->to_string().c_str()); } +void assert_threadpool_metrics_register(const std::string& pool_name, MetricRegistry* instance) { + ASSERT_TRUE(instance->get_metric(pool_name + "_threadpool_size") != nullptr); + ASSERT_TRUE(instance->get_metric(pool_name + "_executed_tasks_total") != nullptr); + ASSERT_TRUE(instance->get_metric(pool_name + "_pending_time_ns_total") != nullptr); + ASSERT_TRUE(instance->get_metric(pool_name + "_execute_time_ns_total") != nullptr); + ASSERT_TRUE(instance->get_metric(pool_name + "_queue_count") != nullptr); +} + TEST_F(StarRocksMetricsTest, test_metrics_register) { auto instance = StarRocksMetrics::instance()->metrics(); ASSERT_NE(nullptr, instance->get_metric("memtable_flush_total")); @@ -313,6 +321,12 @@ TEST_F(StarRocksMetricsTest, test_metrics_register) { ASSERT_NE(nullptr, instance->get_metric("segment_flush_duration_us")); ASSERT_NE(nullptr, instance->get_metric("segment_flush_io_time_us")); ASSERT_NE(nullptr, instance->get_metric("segment_flush_bytes_total")); + assert_threadpool_metrics_register("async_delta_writer", instance); + assert_threadpool_metrics_register("memtable_flush", instance); + assert_threadpool_metrics_register("segment_replicate", instance); + assert_threadpool_metrics_register("segment_flush", instance); + assert_threadpool_metrics_register("update_apply", instance); + assert_threadpool_metrics_register("pk_index_compaction", instance); } } // namespace starrocks