diff --git a/be/src/exec/workgroup/mem_tracker_manager.cpp b/be/src/exec/workgroup/mem_tracker_manager.cpp index 78a980e21fa841..eb542ba30df485 100644 --- a/be/src/exec/workgroup/mem_tracker_manager.cpp +++ b/be/src/exec/workgroup/mem_tracker_manager.cpp @@ -27,6 +27,8 @@ MemTrackerPtr MemTrackerManager::register_workgroup(const WorkGroupPtr& wg) { return GlobalEnv::GetInstance()->query_pool_mem_tracker_shared(); } + std::unique_lock write_lock(_mutex); + const double mem_limit_fraction = wg->mem_limit(); const int64_t memory_limit_bytes = static_cast(GlobalEnv::GetInstance()->query_pool_mem_tracker()->limit() * mem_limit_fraction); @@ -54,6 +56,7 @@ MemTrackerPtr MemTrackerManager::register_workgroup(const WorkGroupPtr& wg) { _shared_mem_trackers[wg->mem_pool()].tracker = shared_mem_tracker; _shared_mem_trackers[wg->mem_pool()].child_count++; // also handles orphaned children + _add_metrics_unlocked(wg->mem_pool(), write_lock); return shared_mem_tracker; } @@ -62,6 +65,8 @@ void MemTrackerManager::deregister_workgroup(const std::string& mem_pool) { return; } + std::unique_lock write_lock(_mutex); + if (_shared_mem_trackers.contains(mem_pool)) { MemTrackerInfo& tracker_info = _shared_mem_trackers.at(mem_pool); if (tracker_info.child_count == 1) { @@ -74,13 +79,116 @@ void MemTrackerManager::deregister_workgroup(const std::string& mem_pool) { } } +void MemTrackerManager::_add_metrics_unlocked(const std::string& mem_pool, UniqueLockType& lock) { + std::call_once(_register_metrics_hook_once_flag, [this] { + StarRocksMetrics::instance()->metrics()->register_hook("mem_pool_metrics_hook", [this] { _update_metrics(); }); + }); + + if (_shared_mem_trackers_metrics.contains(mem_pool)) { + return; + } + + // We have to free the write_lock during metrics registration to prevent a deadlock with the metrics collector. + // The collector will first claim MetricsRegistry::mutex and then MemTrackerManager::mutex due to _update_metrics() + // callback hook. At this point in the code, we are holding the MemTrackerManager::mutex and will try to claim the + // MetricsRegistry::mutex after calling register_metric. Without unlocking, we would run into a deadlock. + lock.unlock(); + + auto* registry = StarRocksMetrics::instance()->metrics(); + + auto mem_limit = std::make_unique(MetricUnit::BYTES); + const bool mem_limit_registered = registry->register_metric("mem_pool_mem_limit_bytes", + MetricLabels().add("name", mem_pool), mem_limit.get()); + + auto mem_usage_bytes = std::make_unique(MetricUnit::BYTES); + const bool mem_usage_bytes_registered = registry->register_metric( + "mem_pool_mem_usage_bytes", MetricLabels().add("name", mem_pool), mem_usage_bytes.get()); + + auto mem_usage_ratio = std::make_unique(MetricUnit::PERCENT); + const bool mem_usage_ratio_registered = registry->register_metric( + "mem_pool_mem_usage_ratio", MetricLabels().add("name", mem_pool), mem_usage_ratio.get()); + + auto workgroup_count = std::make_unique(MetricUnit::NOUNIT); + const bool workgroup_count_registered = registry->register_metric( + "mem_pool_workgroup_count", MetricLabels().add("name", mem_pool), workgroup_count.get()); + + lock.lock(); + + // During the unlocked period above, another thread might have already inserted a metrics entry. Use it if exists. + const std::shared_ptr& metrics = + _shared_mem_trackers_metrics.contains(mem_pool) + ? _shared_mem_trackers_metrics.at(mem_pool) + : _shared_mem_trackers_metrics.emplace(mem_pool, std::make_shared()) + .first->second; + + // During the unlocked period above, other threads might have tried to register the same metrics and succeeded in + // some of them. register_metric will return false if you try to register a metric with the same name and labels + // that is already registered. Therefore, we only std::move the individual metrics which this thread registered. + if (mem_limit_registered) { + metrics->mem_limit = std::move(mem_limit); + } + if (mem_usage_bytes_registered) { + metrics->mem_usage_bytes = std::move(mem_usage_bytes); + } + if (mem_usage_ratio_registered) { + metrics->mem_usage_ratio = std::move(mem_usage_ratio); + } + if (workgroup_count_registered) { + metrics->workgroup_count = std::move(workgroup_count); + } + + // Note: It is also theoretically possible that the mem_tracker was deleted during the unlocked period. + // We are ok with leaking a metric in this case and do not try to unregister. +} + +void MemTrackerManager::_update_metrics() { + std::shared_lock read_lock(_mutex); + _update_metrics_unlocked(read_lock); +} + +void MemTrackerManager::_update_metrics_unlocked(SharedLockType&) { + const auto divide = [](const int64_t a, const int64_t b) { return b <= 0 ? 0.0 : static_cast(a) / b; }; + + for (auto& [mem_pool, metrics] : _shared_mem_trackers_metrics) { + if (const auto it = _shared_mem_trackers.find(mem_pool); it != _shared_mem_trackers.end()) { + const auto& [mem_tracker, child_count] = it->second; + if (metrics->mem_limit) { + metrics->mem_limit->set_value(mem_tracker->limit()); + } + if (metrics->mem_usage_bytes) { + metrics->mem_usage_bytes->set_value(mem_tracker->consumption()); + } + if (metrics->mem_usage_ratio) { + metrics->mem_usage_ratio->set_value(divide(mem_tracker->consumption(), mem_tracker->limit())); + } + if (metrics->workgroup_count) { + metrics->workgroup_count->set_value(child_count); + } + } else { + // Metrics entries for deleted shared_mem_trackers are never deleted, but simply set to 0. + if (metrics->mem_limit) { + metrics->mem_limit->set_value(0); + } + if (metrics->mem_usage_bytes) { + metrics->mem_usage_bytes->set_value(0); + } + if (metrics->mem_usage_ratio) { + metrics->mem_usage_ratio->set_value(0); + } + if (metrics->workgroup_count) { + metrics->workgroup_count->set_value(0); + } + } + } +} + std::vector MemTrackerManager::list_mem_trackers() const { + std::shared_lock read_lock(_mutex); std::vector mem_trackers{}; - mem_trackers.reserve(_shared_mem_trackers.size() + 1); + mem_trackers.reserve(_shared_mem_trackers.size()); for (const auto& mem_tracker : _shared_mem_trackers) { mem_trackers.push_back(mem_tracker.first); } - mem_trackers.push_back(WorkGroup::DEFAULT_MEM_POOL); return mem_trackers; } } // namespace starrocks::workgroup \ No newline at end of file diff --git a/be/src/exec/workgroup/mem_tracker_manager.h b/be/src/exec/workgroup/mem_tracker_manager.h index 03225587b9db03..32ca9794499246 100644 --- a/be/src/exec/workgroup/mem_tracker_manager.h +++ b/be/src/exec/workgroup/mem_tracker_manager.h @@ -15,22 +15,30 @@ #pragma once #include +#include #include "runtime/mem_tracker.h" #include "work_group_fwd.h" namespace starrocks::workgroup { +struct MemTrackerMetrics { + std::unique_ptr mem_limit = nullptr; + std::unique_ptr mem_usage_bytes = nullptr; + std::unique_ptr mem_usage_ratio = nullptr; + std::unique_ptr workgroup_count = nullptr; +}; + using MemTrackerPtr = std::shared_ptr; +using MemTrackerMetricsPtr = std::shared_ptr; struct MemTrackerInfo { MemTrackerPtr tracker; uint32_t child_count; }; -struct MemTrackerManager { +class MemTrackerManager { public: - std::vector list_mem_trackers() const; /** * Constructs and returns a shared_mem_tracker for the workgroup if one does not already exist. * Otherwise, returns the existing instance and increments the number of tracked workgroups by one. @@ -45,7 +53,23 @@ struct MemTrackerManager { */ void deregister_workgroup(const std::string& mem_pool); + /** + * Returns a list of currently registered non-default memory trackers. + */ + std::vector list_mem_trackers() const; + private: + using MutexType = std::shared_mutex; + using UniqueLockType = std::unique_lock; + using SharedLockType = std::shared_lock; + + void _add_metrics_unlocked(const std::string& mem_pool, UniqueLockType& lock); + void _update_metrics(); + void _update_metrics_unlocked(SharedLockType&); + + mutable std::shared_mutex _mutex; + std::once_flag _register_metrics_hook_once_flag; std::unordered_map _shared_mem_trackers{}; + std::unordered_map _shared_mem_trackers_metrics{}; }; } // namespace starrocks::workgroup diff --git a/be/test/exec/workgroup/work_group_manager_test.cpp b/be/test/exec/workgroup/work_group_manager_test.cpp index b484053c62eb15..fdfcbce98a67d7 100644 --- a/be/test/exec/workgroup/work_group_manager_test.cpp +++ b/be/test/exec/workgroup/work_group_manager_test.cpp @@ -107,7 +107,7 @@ PARALLEL_TEST(WorkGroupManagerTest, test_if_unused_memory_pools_are_cleaned_up) _manager->apply(create_operations); - EXPECT_EQ(_manager->list_memory_pools().size(), 2); + EXPECT_EQ(_manager->list_memory_pools().size(), 1); // Version must be strictly larger, otherwise workgroup will not be deleted twg1.version++; @@ -117,11 +117,10 @@ PARALLEL_TEST(WorkGroupManagerTest, test_if_unused_memory_pools_are_cleaned_up) make_twg_op(twg2, TWorkGroupOpType::WORKGROUP_OP_DELETE)}; _manager->apply(delete_operations); - std::this_thread::sleep_for(std::chrono::seconds(1)); // The expired workgroups will only get erased in the next call to apply _manager->apply({}); - EXPECT_EQ(_manager->list_memory_pools().size(), 1); + EXPECT_EQ(_manager->list_memory_pools().size(), 0); } _manager->destroy(); } diff --git a/docs/en/administration/management/monitoring/metrics.md b/docs/en/administration/management/monitoring/metrics.md index cc8b683e01fc36..8c8983a8b79e7d 100644 --- a/docs/en/administration/management/monitoring/metrics.md +++ b/docs/en/administration/management/monitoring/metrics.md @@ -579,6 +579,30 @@ For more information on how to build a monitoring service for your StarRocks clu - Type: Instantaneous - Description: Instantaneous value of resource group memory usage. +### starrocks_be_mem_pool_mem_limit_bytes + +- Unit: Bytes +- Type: Instantaneous +- Description: Memory limit for each memory pool, measured in bytes. + +### starrocks_be_mem_pool_mem_usage_bytes + +- Unit: Bytes +- Type: Instantaneous +- Description: Currently used total memory by each memory pool, measured in bytes. + +### starrocks_be_mem_pool_mem_usage_ratio + +- Unit: - +- Type: Instantaneous +- Description: Ratio of the memory usage of the memory pool to the memory limit of the memory pool. + +### starrocks_be_mem_pool_workgroup_count + +- Unit: Count +- Type: Instantaneous +- Description: Number of resource groups assigned to each memory pool. + ### starrocks_be_pipe_prepare_pool_queue_len - Unit: Count diff --git a/docs/ja/administration/management/monitoring/metrics.md b/docs/ja/administration/management/monitoring/metrics.md index 811cacc829906f..0288f58cfe7b8a 100644 --- a/docs/ja/administration/management/monitoring/metrics.md +++ b/docs/ja/administration/management/monitoring/metrics.md @@ -579,6 +579,30 @@ StarRocks クラスタのモニタリングサービスの構築方法につい - タイプ: Instantaneous - 説明: リソースグループのメモリ使用量の瞬時値。 +### starrocks_be_mem_pool_mem_limit_bytes + +- 単位: Bytes +- タイプ: Instantaneous +- 説明: メモリプールのメモリクォータの瞬時値。 + +### starrocks_be_mem_pool_mem_usage_bytes + +- 単位: Bytes +- タイプ: Instantaneous +- 説明: メモリプールのメモリ使用量の瞬時値。 + +### starrocks_be_mem_pool_mem_usage_ratio + +- 単位: - +- タイプ: Instantaneous +- 説明: メモリプールのメモリクォータに対するメモリ使用量の比率。 + +### starrocks_be_mem_pool_workgroup_count + +- 単位: Count +- タイプ: Instantaneous +- 説明: メモリプールに割り当てられたリソースグループ数の瞬時値。 + ### starrocks_be_pipe_prepare_pool_queue_len - 単位: Count diff --git a/docs/zh/administration/management/monitoring/metrics.md b/docs/zh/administration/management/monitoring/metrics.md index 5daceb19c27826..903bc3c769f0aa 100644 --- a/docs/zh/administration/management/monitoring/metrics.md +++ b/docs/zh/administration/management/monitoring/metrics.md @@ -579,6 +579,30 @@ displayed_sidebar: docs - 类型:瞬时值 - 描述:该资源组内存使用率瞬时值 +### starrocks_be_mem_pool_mem_limit_bytes + +- 单位:Byte +- 类型:瞬时值 +- 描述:该内存池内存配额的瞬时值 + +### starrocks_be_mem_pool_mem_usage_bytes + +- 单位:Byte +- 类型:瞬时值 +- 描述:该内存池当前内存使用量的瞬时值 + +### starrocks_be_mem_pool_mem_usage_ratio + +- 单位:- +- 类型:瞬时值 +- 描述:该内存池内存使用量与内存配额比率的瞬时值 + +### starrocks_be_mem_pool_workgroup_count + +- 单位:个 +- 类型:瞬时值 +- 描述:该内存池分配的资源组数量的瞬时值 + ### starrocks_be_pipe_prepare_pool_queue_len - 单位:个