Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 110 additions & 2 deletions be/src/exec/workgroup/mem_tracker_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ MemTrackerPtr MemTrackerManager::register_workgroup(const WorkGroupPtr& wg) {
return GlobalEnv::GetInstance()->query_pool_mem_tracker_shared();
}

std::unique_lock write_lock(_mutex);

const double mem_limit_fraction = wg->mem_limit();
const int64_t memory_limit_bytes =
static_cast<int64_t>(GlobalEnv::GetInstance()->query_pool_mem_tracker()->limit() * mem_limit_fraction);
Expand Down Expand Up @@ -54,6 +56,7 @@ MemTrackerPtr MemTrackerManager::register_workgroup(const WorkGroupPtr& wg) {
_shared_mem_trackers[wg->mem_pool()].tracker = shared_mem_tracker;
_shared_mem_trackers[wg->mem_pool()].child_count++; // also handles orphaned children

_add_metrics_unlocked(wg->mem_pool(), write_lock);
return shared_mem_tracker;
}

Expand All @@ -62,6 +65,8 @@ void MemTrackerManager::deregister_workgroup(const std::string& mem_pool) {
return;
}

std::unique_lock write_lock(_mutex);

if (_shared_mem_trackers.contains(mem_pool)) {
MemTrackerInfo& tracker_info = _shared_mem_trackers.at(mem_pool);
if (tracker_info.child_count == 1) {
Expand All @@ -74,13 +79,116 @@ void MemTrackerManager::deregister_workgroup(const std::string& mem_pool) {
}
}

void MemTrackerManager::_add_metrics_unlocked(const std::string& mem_pool, UniqueLockType& lock) {
std::call_once(_register_metrics_hook_once_flag, [this] {
StarRocksMetrics::instance()->metrics()->register_hook("mem_pool_metrics_hook", [this] { _update_metrics(); });
});

if (_shared_mem_trackers_metrics.contains(mem_pool)) {
return;
}

// We have to free the write_lock during metrics registration to prevent a deadlock with the metrics collector.
// The collector will first claim MetricsRegistry::mutex and then MemTrackerManager::mutex due to _update_metrics()
// callback hook. At this point in the code, we are holding the MemTrackerManager::mutex and will try to claim the
// MetricsRegistry::mutex after calling register_metric. Without unlocking, we would run into a deadlock.
lock.unlock();

auto* registry = StarRocksMetrics::instance()->metrics();

auto mem_limit = std::make_unique<IntGauge>(MetricUnit::BYTES);
const bool mem_limit_registered = registry->register_metric("mem_pool_mem_limit_bytes",
MetricLabels().add("name", mem_pool), mem_limit.get());

auto mem_usage_bytes = std::make_unique<IntGauge>(MetricUnit::BYTES);
const bool mem_usage_bytes_registered = registry->register_metric(
"mem_pool_mem_usage_bytes", MetricLabels().add("name", mem_pool), mem_usage_bytes.get());

auto mem_usage_ratio = std::make_unique<DoubleGauge>(MetricUnit::PERCENT);
const bool mem_usage_ratio_registered = registry->register_metric(
"mem_pool_mem_usage_ratio", MetricLabels().add("name", mem_pool), mem_usage_ratio.get());

auto workgroup_count = std::make_unique<IntGauge>(MetricUnit::NOUNIT);
const bool workgroup_count_registered = registry->register_metric(
"mem_pool_workgroup_count", MetricLabels().add("name", mem_pool), workgroup_count.get());

lock.lock();

// During the unlocked period above, another thread might have already inserted a metrics entry. Use it if exists.
const std::shared_ptr<MemTrackerMetrics>& metrics =
_shared_mem_trackers_metrics.contains(mem_pool)
? _shared_mem_trackers_metrics.at(mem_pool)
: _shared_mem_trackers_metrics.emplace(mem_pool, std::make_shared<MemTrackerMetrics>())
.first->second;

// During the unlocked period above, other threads might have tried to register the same metrics and succeeded in
// some of them. register_metric will return false if you try to register a metric with the same name and labels
// that is already registered. Therefore, we only std::move the individual metrics which this thread registered.
if (mem_limit_registered) {
metrics->mem_limit = std::move(mem_limit);
}
if (mem_usage_bytes_registered) {
metrics->mem_usage_bytes = std::move(mem_usage_bytes);
}
if (mem_usage_ratio_registered) {
metrics->mem_usage_ratio = std::move(mem_usage_ratio);
}
if (workgroup_count_registered) {
metrics->workgroup_count = std::move(workgroup_count);
}

// Note: It is also theoretically possible that the mem_tracker was deleted during the unlocked period.
// We are ok with leaking a metric in this case and do not try to unregister.
}

void MemTrackerManager::_update_metrics() {
std::shared_lock read_lock(_mutex);
_update_metrics_unlocked(read_lock);
}

void MemTrackerManager::_update_metrics_unlocked(SharedLockType&) {
const auto divide = [](const int64_t a, const int64_t b) { return b <= 0 ? 0.0 : static_cast<double>(a) / b; };

for (auto& [mem_pool, metrics] : _shared_mem_trackers_metrics) {
if (const auto it = _shared_mem_trackers.find(mem_pool); it != _shared_mem_trackers.end()) {
const auto& [mem_tracker, child_count] = it->second;
if (metrics->mem_limit) {
metrics->mem_limit->set_value(mem_tracker->limit());
}
if (metrics->mem_usage_bytes) {
metrics->mem_usage_bytes->set_value(mem_tracker->consumption());
}
if (metrics->mem_usage_ratio) {
metrics->mem_usage_ratio->set_value(divide(mem_tracker->consumption(), mem_tracker->limit()));
}
if (metrics->workgroup_count) {
metrics->workgroup_count->set_value(child_count);
}
} else {
// Metrics entries for deleted shared_mem_trackers are never deleted, but simply set to 0.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in a long running system with Frequent workergroup creation and deletion, will thes garbage metrics accumulated and cause memory occupation and long useless serialization to the /metrics interface.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really.

Workgroups by default belong to DEFAULT_MEM_POOL , these are not tracked in MemTrackerManager and it does not report mempool statistics for such work groups. Frequent creation and deletion of workgroups under a memory pool is a special case.

In the general case, we are already paying the price of not deleting the metrics for workgroups. Compare my implementation with _update_metrics_unlocked() method in work_group.cpp:

} else {
VLOG(2) << "workgroup update_metrics " << name << ", workgroup not exists so cleanup metrics";
wg_metrics->cpu_limit->set_value(0);
wg_metrics->inuse_cpu_ratio->set_value(0);
wg_metrics->inuse_scan_ratio->set_value(0);
wg_metrics->inuse_connector_scan_ratio->set_value(0);
wg_metrics->mem_limit->set_value(0);
wg_metrics->inuse_mem_bytes->set_value(0);
wg_metrics->connector_scan_mem_bytes->set_value(0);
wg_metrics->running_queries->set_value(0);
wg_metrics->total_queries->set_value(0);
wg_metrics->concurrency_overflow_count->set_value(0);
wg_metrics->bigquery_count->set_value(0);
wg_metrics->inuse_cpu_cores->set_value(0);
}

Your concern of garbage metrics being accumulated applies here even more. I believe the reason it was implemented this way is that deleting metrics entries make handling race conditions more complicated and error-prone.

I can change my implementation so that unused metrics are deleted instead of being set to 0. However, I am not sure if it is worth the extra effort as work_group.cpp does not delete its own metrics anyway.

Let me know which one you prefer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. will see if other reviewers have some thought. I am ok to keep as is for now.

if (metrics->mem_limit) {
metrics->mem_limit->set_value(0);
}
if (metrics->mem_usage_bytes) {
metrics->mem_usage_bytes->set_value(0);
}
if (metrics->mem_usage_ratio) {
metrics->mem_usage_ratio->set_value(0);
}
if (metrics->workgroup_count) {
metrics->workgroup_count->set_value(0);
}
}
}
}

std::vector<std::string> MemTrackerManager::list_mem_trackers() const {
std::shared_lock read_lock(_mutex);
std::vector<std::string> mem_trackers{};
mem_trackers.reserve(_shared_mem_trackers.size() + 1);
mem_trackers.reserve(_shared_mem_trackers.size());
for (const auto& mem_tracker : _shared_mem_trackers) {
mem_trackers.push_back(mem_tracker.first);
}
mem_trackers.push_back(WorkGroup::DEFAULT_MEM_POOL);
return mem_trackers;
}
} // namespace starrocks::workgroup
28 changes: 26 additions & 2 deletions be/src/exec/workgroup/mem_tracker_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,30 @@
#pragma once

#include <memory>
#include <shared_mutex>

#include "runtime/mem_tracker.h"
#include "work_group_fwd.h"

namespace starrocks::workgroup {

struct MemTrackerMetrics {
std::unique_ptr<IntGauge> mem_limit = nullptr;
std::unique_ptr<IntGauge> mem_usage_bytes = nullptr;
std::unique_ptr<DoubleGauge> mem_usage_ratio = nullptr;
std::unique_ptr<IntGauge> workgroup_count = nullptr;
};

using MemTrackerPtr = std::shared_ptr<MemTracker>;
using MemTrackerMetricsPtr = std::shared_ptr<MemTrackerMetrics>;

struct MemTrackerInfo {
MemTrackerPtr tracker;
uint32_t child_count;
};

struct MemTrackerManager {
class MemTrackerManager {
public:
std::vector<std::string> list_mem_trackers() const;
/**
* Constructs and returns a shared_mem_tracker for the workgroup if one does not already exist.
* Otherwise, returns the existing instance and increments the number of tracked workgroups by one.
Expand All @@ -45,7 +53,23 @@ struct MemTrackerManager {
*/
void deregister_workgroup(const std::string& mem_pool);

/**
* Returns a list of currently registered non-default memory trackers.
*/
std::vector<std::string> list_mem_trackers() const;

private:
using MutexType = std::shared_mutex;
using UniqueLockType = std::unique_lock<MutexType>;
using SharedLockType = std::shared_lock<MutexType>;

void _add_metrics_unlocked(const std::string& mem_pool, UniqueLockType& lock);
void _update_metrics();
void _update_metrics_unlocked(SharedLockType&);

mutable std::shared_mutex _mutex;
std::once_flag _register_metrics_hook_once_flag;
std::unordered_map<std::string, MemTrackerInfo> _shared_mem_trackers{};
std::unordered_map<std::string, MemTrackerMetricsPtr> _shared_mem_trackers_metrics{};
};
} // namespace starrocks::workgroup
5 changes: 2 additions & 3 deletions be/test/exec/workgroup/work_group_manager_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ PARALLEL_TEST(WorkGroupManagerTest, test_if_unused_memory_pools_are_cleaned_up)

_manager->apply(create_operations);

EXPECT_EQ(_manager->list_memory_pools().size(), 2);
EXPECT_EQ(_manager->list_memory_pools().size(), 1);

// Version must be strictly larger, otherwise workgroup will not be deleted
twg1.version++;
Expand All @@ -117,11 +117,10 @@ PARALLEL_TEST(WorkGroupManagerTest, test_if_unused_memory_pools_are_cleaned_up)
make_twg_op(twg2, TWorkGroupOpType::WORKGROUP_OP_DELETE)};

_manager->apply(delete_operations);
std::this_thread::sleep_for(std::chrono::seconds(1));
// The expired workgroups will only get erased in the next call to apply
_manager->apply({});

EXPECT_EQ(_manager->list_memory_pools().size(), 1);
EXPECT_EQ(_manager->list_memory_pools().size(), 0);
}
_manager->destroy();
}
Expand Down
24 changes: 24 additions & 0 deletions docs/en/administration/management/monitoring/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,30 @@ For more information on how to build a monitoring service for your StarRocks clu
- Type: Instantaneous
- Description: Instantaneous value of resource group memory usage.

### starrocks_be_mem_pool_mem_limit_bytes

- Unit: Bytes
- Type: Instantaneous
- Description: Memory limit for each memory pool, measured in bytes.

### starrocks_be_mem_pool_mem_usage_bytes

- Unit: Bytes
- Type: Instantaneous
- Description: Currently used total memory by each memory pool, measured in bytes.

### starrocks_be_mem_pool_mem_usage_ratio

- Unit: -
- Type: Instantaneous
- Description: Ratio of the memory usage of the memory pool to the memory limit of the memory pool.

### starrocks_be_mem_pool_workgroup_count

- Unit: Count
- Type: Instantaneous
- Description: Number of resource groups assigned to each memory pool.

### starrocks_be_pipe_prepare_pool_queue_len

- Unit: Count
Expand Down
24 changes: 24 additions & 0 deletions docs/ja/administration/management/monitoring/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,30 @@ StarRocks クラスタのモニタリングサービスの構築方法につい
- タイプ: Instantaneous
- 説明: リソースグループのメモリ使用量の瞬時値。

### starrocks_be_mem_pool_mem_limit_bytes

- 単位: Bytes
- タイプ: Instantaneous
- 説明: メモリプールのメモリクォータの瞬時値。

### starrocks_be_mem_pool_mem_usage_bytes

- 単位: Bytes
- タイプ: Instantaneous
- 説明: メモリプールのメモリ使用量の瞬時値。

### starrocks_be_mem_pool_mem_usage_ratio

- 単位: -
- タイプ: Instantaneous
- 説明: メモリプールのメモリクォータに対するメモリ使用量の比率。

### starrocks_be_mem_pool_workgroup_count

- 単位: Count
- タイプ: Instantaneous
- 説明: メモリプールに割り当てられたリソースグループ数の瞬時値。

### starrocks_be_pipe_prepare_pool_queue_len

- 単位: Count
Expand Down
24 changes: 24 additions & 0 deletions docs/zh/administration/management/monitoring/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,30 @@ displayed_sidebar: docs
- 类型:瞬时值
- 描述:该资源组内存使用率瞬时值

### starrocks_be_mem_pool_mem_limit_bytes

- 单位:Byte
- 类型:瞬时值
- 描述:该内存池内存配额的瞬时值

### starrocks_be_mem_pool_mem_usage_bytes

- 单位:Byte
- 类型:瞬时值
- 描述:该内存池当前内存使用量的瞬时值

### starrocks_be_mem_pool_mem_usage_ratio

- 单位:-
- 类型:瞬时值
- 描述:该内存池内存使用量与内存配额比率的瞬时值

### starrocks_be_mem_pool_workgroup_count

- 单位:个
- 类型:瞬时值
- 描述:该内存池分配的资源组数量的瞬时值

### starrocks_be_pipe_prepare_pool_queue_len

- 单位:个
Expand Down
Loading