Skip to content

Commit

Permalink
[Enhancement] Add thread pool metrics (backport #40171) (backport #42351
Browse files Browse the repository at this point in the history
) (#42683)

Signed-off-by: PengFei Li <[email protected]>
  • Loading branch information
banmoy authored Mar 15, 2024
1 parent 35ea35a commit 47b8d4e
Show file tree
Hide file tree
Showing 10 changed files with 206 additions and 27 deletions.
3 changes: 1 addition & 2 deletions be/src/agent/agent_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,7 @@ void AgentServer::Impl::init_or_die() {
BUILD_DYNAMIC_TASK_THREAD_POOL("publish_version", MIN_TRANSACTION_PUBLISH_WORKER_COUNT,
max_publish_version_worker_count, DEFAULT_DYNAMIC_THREAD_POOL_QUEUE_SIZE,
_thread_pool_publish_version);
REGISTER_GAUGE_STARROCKS_METRIC(publish_version_queue_count,
[this]() { return _thread_pool_publish_version->num_queued_tasks(); });
REGISTER_THREAD_POOL_METRICS(publish_version, _thread_pool_publish_version);
#endif

BUILD_DYNAMIC_TASK_THREAD_POOL("drop", 1, config::drop_tablet_worker_count, std::numeric_limits<int>::max(),
Expand Down
3 changes: 1 addition & 2 deletions be/src/storage/persistent_index_compaction_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ Status PersistentIndexCompactionManager::init() {
.set_min_threads(1)
.set_max_threads(max_pk_index_compaction_thread_cnt)
.build(&_worker_thread_pool));
REGISTER_GAUGE_STARROCKS_METRIC(pk_index_compaction_queue_count,
[this]() { return _worker_thread_pool->num_queued_tasks(); });
REGISTER_THREAD_POOL_METRICS(pk_index_compaction, _worker_thread_pool);

return Status::OK();
}
Expand Down
19 changes: 6 additions & 13 deletions be/src/storage/storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,28 +217,21 @@ Status StorageEngine::_open(const EngineOptions& options) {

_async_delta_writer_executor =
std::make_unique<bthreads::ThreadPoolExecutor>(thread_pool.release(), kTakesOwnership);
REGISTER_GAUGE_STARROCKS_METRIC(async_delta_writer_queue_count, [this]() {
return static_cast<bthreads::ThreadPoolExecutor*>(_async_delta_writer_executor.get())
->get_thread_pool()
->num_queued_tasks();
})
REGISTER_THREAD_POOL_METRICS(
async_delta_writer,
static_cast<bthreads::ThreadPoolExecutor*>(_async_delta_writer_executor.get())->get_thread_pool());

_memtable_flush_executor = std::make_unique<MemTableFlushExecutor>();
RETURN_IF_ERROR_WITH_WARN(_memtable_flush_executor->init(dirs), "init MemTableFlushExecutor failed");
REGISTER_GAUGE_STARROCKS_METRIC(memtable_flush_queue_count, [this]() {
return _memtable_flush_executor->get_thread_pool()->num_queued_tasks();
})
REGISTER_THREAD_POOL_METRICS(memtable_flush, _memtable_flush_executor->get_thread_pool());

_segment_flush_executor = std::make_unique<SegmentFlushExecutor>();
RETURN_IF_ERROR_WITH_WARN(_segment_flush_executor->init(dirs), "init SegmentFlushExecutor failed");
REGISTER_GAUGE_STARROCKS_METRIC(segment_flush_queue_count,
[this]() { return _segment_flush_executor->get_thread_pool()->num_queued_tasks(); })
REGISTER_THREAD_POOL_METRICS(segment_flush, _segment_flush_executor->get_thread_pool());

_segment_replicate_executor = std::make_unique<SegmentReplicateExecutor>();
RETURN_IF_ERROR_WITH_WARN(_segment_replicate_executor->init(dirs), "init SegmentReplicateExecutor failed");
REGISTER_GAUGE_STARROCKS_METRIC(segment_replicate_queue_count, [this]() {
return _segment_replicate_executor->get_thread_pool()->num_queued_tasks();
})
REGISTER_THREAD_POOL_METRICS(segment_replicate, _segment_replicate_executor->get_thread_pool());

RETURN_IF_ERROR_WITH_WARN(_replication_txn_manager->init(dirs), "init ReplicationTxnManager failed");

Expand Down
3 changes: 1 addition & 2 deletions be/src/storage/update_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,7 @@ Status UpdateManager::init() {
max_thread_cnt = config::transaction_apply_worker_count;
}
RETURN_IF_ERROR(ThreadPoolBuilder("update_apply").set_max_threads(max_thread_cnt).build(&_apply_thread_pool));
REGISTER_GAUGE_STARROCKS_METRIC(update_apply_queue_count,
[this]() { return _apply_thread_pool->num_queued_tasks(); });
REGISTER_THREAD_POOL_METRICS(update_apply, _apply_thread_pool);

int max_get_thread_cnt =
config::get_pindex_worker_count > max_thread_cnt ? config::get_pindex_worker_count : max_thread_cnt * 2;
Expand Down
35 changes: 27 additions & 8 deletions be/src/util/starrocks_metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,25 @@ class IntGaugeMetricsMap {
StarRocksMetrics::instance()->metrics()->register_hook( \
#name, [&]() { StarRocksMetrics::instance()->name.set_value(func()); });

#define METRICS_DEFINE_THREAD_POOL(threadpool_name) \
METRIC_DEFINE_UINT_GAUGE(threadpool_name##_threadpool_size, MetricUnit::NOUNIT); \
METRIC_DEFINE_UINT_GAUGE(threadpool_name##_executed_tasks_total, MetricUnit::NOUNIT); \
METRIC_DEFINE_UINT_GAUGE(threadpool_name##_pending_time_ns_total, MetricUnit::NANOSECONDS); \
METRIC_DEFINE_UINT_GAUGE(threadpool_name##_execute_time_ns_total, MetricUnit::NANOSECONDS); \
METRIC_DEFINE_UINT_GAUGE(threadpool_name##_queue_count, MetricUnit::NOUNIT)

#define REGISTER_THREAD_POOL_METRICS(name, threadpool) \
do { \
REGISTER_GAUGE_STARROCKS_METRIC(name##_threadpool_size, [this]() { return threadpool->max_threads(); }) \
REGISTER_GAUGE_STARROCKS_METRIC(name##_executed_tasks_total, \
[this]() { return threadpool->total_executed_tasks(); }) \
REGISTER_GAUGE_STARROCKS_METRIC(name##_pending_time_ns_total, \
[this]() { return threadpool->total_pending_time_ns(); }) \
REGISTER_GAUGE_STARROCKS_METRIC(name##_execute_time_ns_total, \
[this]() { return threadpool->total_execute_time_ns(); }) \
REGISTER_GAUGE_STARROCKS_METRIC(name##_queue_count, [this]() { return threadpool->num_queued_tasks(); }) \
} while (false)

class StarRocksMetrics {
public:
// query execution
Expand Down Expand Up @@ -254,14 +273,14 @@ class StarRocksMetrics {
METRIC_DEFINE_UINT_GAUGE(brpc_endpoint_stub_count, MetricUnit::NOUNIT);
METRIC_DEFINE_UINT_GAUGE(tablet_writer_count, MetricUnit::NOUNIT);

// queue task count of thread pool
METRIC_DEFINE_UINT_GAUGE(publish_version_queue_count, MetricUnit::NOUNIT);
METRIC_DEFINE_UINT_GAUGE(async_delta_writer_queue_count, MetricUnit::NOUNIT);
METRIC_DEFINE_UINT_GAUGE(memtable_flush_queue_count, MetricUnit::NOUNIT);
METRIC_DEFINE_UINT_GAUGE(segment_replicate_queue_count, MetricUnit::NOUNIT);
METRIC_DEFINE_UINT_GAUGE(segment_flush_queue_count, MetricUnit::NOUNIT);
METRIC_DEFINE_UINT_GAUGE(update_apply_queue_count, MetricUnit::NOUNIT);
METRIC_DEFINE_UINT_GAUGE(pk_index_compaction_queue_count, MetricUnit::NOUNIT);
// thread pool metrics
METRICS_DEFINE_THREAD_POOL(publish_version);
METRICS_DEFINE_THREAD_POOL(async_delta_writer);
METRICS_DEFINE_THREAD_POOL(memtable_flush);
METRICS_DEFINE_THREAD_POOL(segment_replicate);
METRICS_DEFINE_THREAD_POOL(segment_flush);
METRICS_DEFINE_THREAD_POOL(update_apply);
METRICS_DEFINE_THREAD_POOL(pk_index_compaction);

METRIC_DEFINE_UINT_GAUGE(load_rpc_threadpool_size, MetricUnit::NOUNIT);

Expand Down
7 changes: 7 additions & 0 deletions be/src/util/threadpool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,7 @@ void ThreadPool::dispatch_thread() {

l.unlock();

MonoTime start_time = MonoTime::Now();
// Execute the task
task.runnable->run();
current_thread->inc_finished_tasks();
Expand All @@ -569,6 +570,12 @@ void ThreadPool::dispatch_thread() {
// In the worst case, the destructor might even try to do something
// with this threadpool, and produce a deadlock.
task.runnable.reset();
MonoTime finish_time = MonoTime::Now();

_total_executed_tasks.increment(1);
_total_pending_time_ns.increment(start_time.GetDeltaSince(task.submit_time).ToNanoseconds());
_total_execute_time_ns.increment(finish_time.GetDeltaSince(start_time).ToNanoseconds());

l.lock();
_last_active_timestamp = MonoTime::Now();

Expand Down
16 changes: 16 additions & 0 deletions be/src/util/threadpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
#include "util/bthreads/semaphore.h"
// resolve `barrier` macro conflicts with boost/thread.hpp header file
#undef barrier
#include "util/metrics.h"
#include "util/monotime.h"
#include "util/priority_queue.h"

Expand Down Expand Up @@ -248,6 +249,12 @@ class ThreadPool {

int max_threads() const { return _max_threads.load(std::memory_order_acquire); }

int64_t total_executed_tasks() const { return _total_executed_tasks.value(); }

int64_t total_pending_time_ns() const { return _total_pending_time_ns.value(); }

int64_t total_execute_time_ns() const { return _total_execute_time_ns.value(); }

private:
friend class ThreadPoolBuilder;
friend class ThreadPoolToken;
Expand Down Expand Up @@ -372,6 +379,15 @@ class ThreadPool {
// ExecutionMode::CONCURRENT token used by the pool for tokenless submission.
std::unique_ptr<ThreadPoolToken> _tokenless;

// Total number of tasks that have finished
CoreLocalCounter<int64_t> _total_executed_tasks{MetricUnit::NOUNIT};

// Total time in nanoseconds that tasks pending in the queue.
CoreLocalCounter<int64_t> _total_pending_time_ns{MetricUnit::NOUNIT};

// Total time in nanoseconds to execute tasks.
CoreLocalCounter<int64_t> _total_execute_time_ns{MetricUnit::NOUNIT};

ThreadPool(const ThreadPool&) = delete;
const ThreadPool& operator=(const ThreadPool&) = delete;
};
Expand Down
1 change: 1 addition & 0 deletions be/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@ set(EXEC_FILES
./util/cidr_test.cpp
./util/coding_test.cpp
./util/core_local_test.cpp
./util/core_local_counter_test.cpp
./util/countdown_latch_test.cpp
./util/crc32c_test.cpp
./util/dynamic_cache_test.cpp
Expand Down
132 changes: 132 additions & 0 deletions be/test/util/core_local_counter_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <bvar/bvar.h>
#include <gtest/gtest.h>

#include <atomic>

#include "util/metrics.h"

namespace starrocks {

const size_t OPS_PER_THREAD = 500000;

static void* thread_counter(void* arg) {
bvar::Adder<uint64_t>* reducer = (bvar::Adder<uint64_t>*)arg;
butil::Timer timer;
timer.start();
for (size_t i = 0; i < OPS_PER_THREAD; ++i) {
(*reducer) << 2;
}
timer.stop();
return (void*)(timer.n_elapsed());
}

void* add_atomic(void* arg) {
butil::atomic<uint64_t>* counter = (butil::atomic<uint64_t>*)arg;
butil::Timer timer;
timer.start();
for (size_t i = 0; i < OPS_PER_THREAD / 100; ++i) {
counter->fetch_add(2, butil::memory_order_relaxed);
}
timer.stop();
return (void*)(timer.n_elapsed());
}

static long start_perf_test_with_atomic(size_t num_thread) {
butil::atomic<uint64_t> counter(0);
pthread_t threads[num_thread];
for (size_t i = 0; i < num_thread; ++i) {
pthread_create(&threads[i], nullptr, &add_atomic, (void*)&counter);
}
long totol_time = 0;
for (size_t i = 0; i < num_thread; ++i) {
void* ret;
pthread_join(threads[i], &ret);
totol_time += (long)ret;
}
long avg_time = totol_time / (OPS_PER_THREAD / 100 * num_thread);
EXPECT_EQ(2ul * num_thread * OPS_PER_THREAD / 100, counter.load());
return avg_time;
}

static long start_perf_test_with_adder(size_t num_thread) {
bvar::Adder<uint64_t> reducer;
EXPECT_TRUE(reducer.valid());
pthread_t threads[num_thread];
for (size_t i = 0; i < num_thread; ++i) {
pthread_create(&threads[i], nullptr, &thread_counter, (void*)&reducer);
}
long totol_time = 0;
for (size_t i = 0; i < num_thread; ++i) {
void* ret = nullptr;
pthread_join(threads[i], &ret);
totol_time += (long)ret;
}
long avg_time = totol_time / (OPS_PER_THREAD * num_thread);
EXPECT_EQ(2ul * num_thread * OPS_PER_THREAD, reducer.get_value());
return avg_time;
}

static void* core_local_counter(void* arg) {
CoreLocalCounter<int64_t>* counter = (CoreLocalCounter<int64_t>*)arg;
butil::Timer timer;
timer.start();
for (size_t i = 0; i < OPS_PER_THREAD; ++i) {
counter->increment(2);
}
timer.stop();
return (void*)(timer.n_elapsed());
}

static long start_perf_test_with_core_local(size_t num_thread) {
CoreLocalCounter<int64_t> counter(MetricUnit::NOUNIT);
pthread_t threads[num_thread];
for (size_t i = 0; i < num_thread; ++i) {
pthread_create(&threads[i], nullptr, &core_local_counter, (void*)&counter);
}
long totol_time = 0;
for (size_t i = 0; i < num_thread; ++i) {
void* ret = nullptr;
pthread_join(threads[i], &ret);
totol_time += (long)ret;
}
long avg_time = totol_time / (OPS_PER_THREAD * num_thread);
EXPECT_EQ(2ul * num_thread * OPS_PER_THREAD, counter.value());
return avg_time;
}

// Compare the performance among bvar, atomic and CoreLocalCounter.
// You should build the test with BUILD_TYPE=release. The way to test
// is same as that of brpc https://github.com/apache/brpc/blob/1.3.0/test/bvar_reducer_unittest.cpp#L124
TEST(CoreLocalCounterTest, DISABLED_test_perf) {
std::ostringstream oss;
for (size_t i = 1; i <= 24; ++i) {
oss << i << '\t' << start_perf_test_with_adder(i) << '\n';
}
LOG(INFO) << "Adder performance:\n" << oss.str();
oss.str("");
for (size_t i = 1; i <= 24; ++i) {
oss << i << '\t' << start_perf_test_with_core_local(i) << '\n';
}
LOG(INFO) << "CoreLocal performance:\n" << oss.str();
oss.str("");
for (size_t i = 1; i <= 24; ++i) {
oss << i << '\t' << start_perf_test_with_atomic(i) << '\n';
}
LOG(INFO) << "Atomic performance:\n" << oss.str();
}

} // namespace starrocks
14 changes: 14 additions & 0 deletions be/test/util/starrocks_metrics_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,14 @@ TEST_F(StarRocksMetricsTest, PageCacheMetrics) {
ASSERT_STREQ(std::to_string(cache->get_capacity()).c_str(), capacity_metric->to_string().c_str());
}

void assert_threadpool_metrics_register(const std::string& pool_name, MetricRegistry* instance) {
ASSERT_TRUE(instance->get_metric(pool_name + "_threadpool_size") != nullptr);
ASSERT_TRUE(instance->get_metric(pool_name + "_executed_tasks_total") != nullptr);
ASSERT_TRUE(instance->get_metric(pool_name + "_pending_time_ns_total") != nullptr);
ASSERT_TRUE(instance->get_metric(pool_name + "_execute_time_ns_total") != nullptr);
ASSERT_TRUE(instance->get_metric(pool_name + "_queue_count") != nullptr);
}

TEST_F(StarRocksMetricsTest, test_metrics_register) {
auto instance = StarRocksMetrics::instance()->metrics();
ASSERT_NE(nullptr, instance->get_metric("memtable_flush_total"));
Expand All @@ -313,6 +321,12 @@ TEST_F(StarRocksMetricsTest, test_metrics_register) {
ASSERT_NE(nullptr, instance->get_metric("segment_flush_duration_us"));
ASSERT_NE(nullptr, instance->get_metric("segment_flush_io_time_us"));
ASSERT_NE(nullptr, instance->get_metric("segment_flush_bytes_total"));
assert_threadpool_metrics_register("async_delta_writer", instance);
assert_threadpool_metrics_register("memtable_flush", instance);
assert_threadpool_metrics_register("segment_replicate", instance);
assert_threadpool_metrics_register("segment_flush", instance);
assert_threadpool_metrics_register("update_apply", instance);
assert_threadpool_metrics_register("pk_index_compaction", instance);
}

} // namespace starrocks

0 comments on commit 47b8d4e

Please sign in to comment.