diff --git a/be/src/exec/workgroup/mem_tracker_manager.cpp b/be/src/exec/workgroup/mem_tracker_manager.cpp index 37b5e9d9f100cb..78a980e21fa841 100644 --- a/be/src/exec/workgroup/mem_tracker_manager.cpp +++ b/be/src/exec/workgroup/mem_tracker_manager.cpp @@ -21,7 +21,8 @@ #include "work_group.h" namespace starrocks::workgroup { -MemTrackerManager::MemTrackerPtr MemTrackerManager::get_parent_mem_tracker(const WorkGroupPtr& wg) { + +MemTrackerPtr MemTrackerManager::register_workgroup(const WorkGroupPtr& wg) { if (WorkGroup::DEFAULT_MEM_POOL == wg->mem_pool()) { return GlobalEnv::GetInstance()->query_pool_mem_tracker_shared(); } @@ -34,13 +35,15 @@ MemTrackerManager::MemTrackerPtr MemTrackerManager::get_parent_mem_tracker(const // the same mem_pool also have the same mem_limit. if (_shared_mem_trackers.contains(wg->mem_pool())) { // We must handle an edge case: - // 1. All RGs using a specific mem_pool are deleted. + // 1. All RGs using a specific mem_pool are marked for deletion. // 2. The shared tracker for that pool remains cached here. // 3. A new RG is created with the same mem_pool name but a different mem_limit. // Therefore, we must verify the cached tracker's limit matches the current RG's limit. - if (auto& shared_mem_tracker = _shared_mem_trackers.at(wg->mem_pool()); - shared_mem_tracker->limit() == memory_limit_bytes) { - return shared_mem_tracker; + // Otherwise, a new mem_pool with thew new mem_limit will be constructed, and expiring children are orphaned. + if (MemTrackerInfo& tracker_info = _shared_mem_trackers.at(wg->mem_pool()); + tracker_info.tracker->limit() == memory_limit_bytes) { + tracker_info.child_count++; + return tracker_info.tracker; } } @@ -48,7 +51,36 @@ MemTrackerManager::MemTrackerPtr MemTrackerManager::get_parent_mem_tracker(const std::make_shared(MemTrackerType::RESOURCE_GROUP_SHARED_MEMORY_POOL, memory_limit_bytes, wg->mem_pool(), GlobalEnv::GetInstance()->query_pool_mem_tracker()); - _shared_mem_trackers.insert_or_assign(wg->mem_pool(), shared_mem_tracker); + _shared_mem_trackers[wg->mem_pool()].tracker = shared_mem_tracker; + _shared_mem_trackers[wg->mem_pool()].child_count++; // also handles orphaned children + return shared_mem_tracker; } + +void MemTrackerManager::deregister_workgroup(const std::string& mem_pool) { + if (WorkGroup::DEFAULT_MEM_POOL == mem_pool) { + return; + } + + if (_shared_mem_trackers.contains(mem_pool)) { + MemTrackerInfo& tracker_info = _shared_mem_trackers.at(mem_pool); + if (tracker_info.child_count == 1) { + // The shared tracker will be erased if its last child is being deregistered + _shared_mem_trackers.erase(mem_pool); + } else { + DCHECK(tracker_info.child_count > 1); + tracker_info.child_count--; + } + } +} + +std::vector MemTrackerManager::list_mem_trackers() const { + std::vector mem_trackers{}; + mem_trackers.reserve(_shared_mem_trackers.size() + 1); + for (const auto& mem_tracker : _shared_mem_trackers) { + mem_trackers.push_back(mem_tracker.first); + } + mem_trackers.push_back(WorkGroup::DEFAULT_MEM_POOL); + return mem_trackers; +} } // namespace starrocks::workgroup \ No newline at end of file diff --git a/be/src/exec/workgroup/mem_tracker_manager.h b/be/src/exec/workgroup/mem_tracker_manager.h index ab781a81524aa7..03225587b9db03 100644 --- a/be/src/exec/workgroup/mem_tracker_manager.h +++ b/be/src/exec/workgroup/mem_tracker_manager.h @@ -20,12 +20,32 @@ #include "work_group_fwd.h" namespace starrocks::workgroup { + +using MemTrackerPtr = std::shared_ptr; + +struct MemTrackerInfo { + MemTrackerPtr tracker; + uint32_t child_count; +}; + struct MemTrackerManager { public: - using MemTrackerPtr = std::shared_ptr; - MemTrackerPtr get_parent_mem_tracker(const WorkGroupPtr& wg); + std::vector list_mem_trackers() const; + /** + * Constructs and returns a shared_mem_tracker for the workgroup if one does not already exist. + * Otherwise, returns the existing instance and increments the number of tracked workgroups by one. + * This method must be called whenever a new workgroup is constructed. + */ + [[nodiscard]] MemTrackerPtr register_workgroup(const WorkGroupPtr& wg); + + /** + * Decrements the number of tracked workgroups of the given memory pool by one. + * If all its tracked children are deregistered, the memory pool entry will be erased. + * This method must be called whenever an existing workgroup is destructed. + */ + void deregister_workgroup(const std::string& mem_pool); private: - std::unordered_map _shared_mem_trackers{}; + std::unordered_map _shared_mem_trackers{}; }; } // namespace starrocks::workgroup diff --git a/be/src/exec/workgroup/work_group.cpp b/be/src/exec/workgroup/work_group.cpp index 610c1515c713e1..73419b5bfd4d0c 100644 --- a/be/src/exec/workgroup/work_group.cpp +++ b/be/src/exec/workgroup/work_group.cpp @@ -525,10 +525,12 @@ void WorkGroupManager::apply(const std::vector& ops) { while (it != _workgroup_expired_versions.end()) { auto wg_it = _workgroups.find(*it); if (wg_it != _workgroups.end() && wg_it->second->is_removable()) { - auto id = wg_it->second->id(); - auto version = wg_it->second->version(); + const auto id = wg_it->second->id(); + const auto version = wg_it->second->version(); + const auto mem_pool = wg_it->second->mem_pool(); _sum_cpu_weight -= wg_it->second->cpu_weight(); _workgroups.erase(wg_it); + _shared_mem_tracker_manager.deregister_workgroup(mem_pool); auto version_it = _workgroup_versions.find(id); if (version_it != _workgroup_versions.end() && version_it->second <= version) { _workgroup_versions.erase(version_it); @@ -564,7 +566,7 @@ void WorkGroupManager::create_workgroup_unlocked(const WorkGroupPtr& wg, UniqueL return; } - auto parent_mem_tracker = _shared_mem_tracker_manager.get_parent_mem_tracker(wg); + auto parent_mem_tracker = _shared_mem_tracker_manager.register_workgroup(wg); wg->init(parent_mem_tracker); _workgroups[unique_id] = wg; @@ -579,7 +581,7 @@ void WorkGroupManager::create_workgroup_unlocked(const WorkGroupPtr& wg, UniqueL auto& old_wg = _workgroups[old_unique_id]; _executors_manager.reclaim_cpuids_from_worgroup(old_wg.get()); - old_wg->mark_del(); + old_wg->mark_del(_workgroup_expiration_time); _workgroup_expired_versions.push_back(old_unique_id); LOG(INFO) << "workgroup expired version: " << wg->name() << "(" << wg->id() << "," << stale_version << ")"; @@ -633,7 +635,7 @@ void WorkGroupManager::delete_workgroup_unlocked(const WorkGroupPtr& wg) { if (wg_it != _workgroups.end()) { const auto& old_wg = wg_it->second; _executors_manager.reclaim_cpuids_from_worgroup(old_wg.get()); - old_wg->mark_del(); + old_wg->mark_del(_workgroup_expiration_time); _executors_manager.update_shared_executors(); _workgroup_expired_versions.push_back(unique_id); LOG(INFO) << "workgroup expired version: " << wg->name() << "(" << wg->id() << "," << curr_version << ")"; @@ -652,6 +654,11 @@ std::vector WorkGroupManager::list_workgroups() { return alive_workgroups; } +std::vector WorkGroupManager::list_memory_pools() const { + std::shared_lock read_lock(_mutex); + return _shared_mem_tracker_manager.list_mem_trackers(); +} + void WorkGroupManager::for_each_workgroup(const WorkGroupConsumer& consumer) const { std::shared_lock read_lock(_mutex); for (const auto& [_, wg] : _workgroups) { @@ -687,6 +694,11 @@ void WorkGroupManager::change_enable_resource_group_cpu_borrowing(const bool val _executors_manager.change_enable_resource_group_cpu_borrowing(val); } +void WorkGroupManager::set_workgroup_expiration_time(const std::chrono::seconds value) { + std::unique_lock write_lock(_mutex); + _workgroup_expiration_time = value; +} + // ------------------------------------------------------------------------------------ // DefaultWorkGroupInitialization // ------------------------------------------------------------------------------------ diff --git a/be/src/exec/workgroup/work_group.h b/be/src/exec/workgroup/work_group.h index 7c297ac01d7cbf..bc21e0407f1e8b 100644 --- a/be/src/exec/workgroup/work_group.h +++ b/be/src/exec/workgroup/work_group.h @@ -36,8 +36,7 @@ class TWorkGroup; namespace workgroup { -using seconds = std::chrono::seconds; -using milliseconds = std::chrono::microseconds; +using vacuum_time_precision = std::chrono::microseconds; using steady_clock = std::chrono::steady_clock; using std::chrono::duration_cast; @@ -158,25 +157,25 @@ class WorkGroup : public std::enable_shared_from_this { // mark the workgroup is deleted, but at the present, it can not be removed from WorkGroupManager, because // 1. there exists pending drivers // 2. there is a race condition that a driver is attached to the workgroup after it is marked del. - void mark_del() { + void mark_del(const std::chrono::seconds expiration_time) { bool expect_false = false; if (_is_marked_del.compare_exchange_strong(expect_false, true)) { - static constexpr seconds expire_seconds{120}; - _vacuum_ttl = duration_cast(steady_clock::now().time_since_epoch() + expire_seconds).count(); + _vacuum_ttl = duration_cast(steady_clock::now().time_since_epoch() + expiration_time) + .count(); } } // no drivers shall be added to this workgroup bool is_marked_del() const { return _is_marked_del.load(std::memory_order_acquire); } // a workgroup should wait several seconds to be cleaned safely. bool is_expired() const { - auto now = duration_cast(steady_clock::now().time_since_epoch()).count(); + const auto now = duration_cast(steady_clock::now().time_since_epoch()).count(); return now > _vacuum_ttl; } // return true if current workgroup is removable: // 1. is already marked del // 2. no pending drivers exists - // 3. wait for a period of vacuum_ttl to prevent race condition + // 3. wait until vacuum_ttl to prevent race condition bool is_removable() const { return is_marked_del() && _num_running_drivers.load(std::memory_order_acquire) == 0 && is_expired(); } @@ -244,8 +243,8 @@ class WorkGroup : public std::enable_shared_from_this { int64_t _spill_mem_limit_bytes = -1; std::string _mem_pool; - std::shared_ptr _mem_tracker = nullptr; std::shared_ptr _shared_mem_tracker = nullptr; + std::shared_ptr _mem_tracker = nullptr; std::shared_ptr _connector_scan_mem_tracker = nullptr; WorkGroupDriverSchedEntity _driver_sched_entity; @@ -256,6 +255,7 @@ class WorkGroup : public std::enable_shared_from_this { std::atomic _num_running_drivers = 0; std::atomic _acc_num_drivers = 0; + // vacuum_ttl is set to max, as a data race might cause a thread to read `is_marked_del = true` and `vacuum_ttl = 0` int64_t _vacuum_ttl = std::numeric_limits::max(); // Metrics of this workgroup @@ -296,6 +296,7 @@ class WorkGroupManager { void apply(const std::vector& ops); std::vector list_workgroups(); + std::vector list_memory_pools() const; using WorkGroupConsumer = std::function; void for_each_workgroup(const WorkGroupConsumer& consumer) const; @@ -307,6 +308,7 @@ class WorkGroupManager { void for_each_executors(const ExecutorsManager::ExecutorsConsumer& consumer) const; void change_num_connector_scan_threads(uint32_t num_connector_scan_threads); void change_enable_resource_group_cpu_borrowing(bool val); + void set_workgroup_expiration_time(std::chrono::seconds value); private: using MutexType = std::shared_mutex; @@ -333,6 +335,7 @@ class WorkGroupManager { std::unordered_map _workgroups; std::unordered_map _workgroup_versions; std::list _workgroup_expired_versions; + std::chrono::seconds _workgroup_expiration_time{120}; std::atomic _sum_cpu_weight = 0; MemTrackerManager _shared_mem_tracker_manager; diff --git a/be/test/exec/workgroup/mem_tracker_manager_test.cpp b/be/test/exec/workgroup/mem_tracker_manager_test.cpp index 8624e9980cbeaf..91af86770dcdbf 100644 --- a/be/test/exec/workgroup/mem_tracker_manager_test.cpp +++ b/be/test/exec/workgroup/mem_tracker_manager_test.cpp @@ -25,7 +25,7 @@ PARALLEL_TEST(MemTrackerMangerTest, test_mem_tracker_for_default_mem_pool) { const auto work_group{std::make_shared("default_wg", 123, WorkGroup::DEFAULT_VERSION, 1, 0.5, 0, 0.9, WorkGroupType::WG_DEFAULT, WorkGroup::DEFAULT_MEM_POOL)}; - const auto tracker{manager.get_parent_mem_tracker(work_group)}; + const auto tracker{manager.register_workgroup(work_group)}; ASSERT_EQ(tracker, GlobalEnv::GetInstance()->query_pool_mem_tracker_shared()); } @@ -38,8 +38,12 @@ PARALLEL_TEST(MemTrackerMangerTest, test_mem_tracker_for_custom_mem_pool) { const auto work_group3{std::make_shared("wg_2", 134, WorkGroup::DEFAULT_VERSION, 1, 0.5, 0, 0.9, WorkGroupType::WG_DEFAULT, "other_pool")}; - ASSERT_EQ(manager.get_parent_mem_tracker(work_group1), manager.get_parent_mem_tracker(work_group2)); - ASSERT_NE(manager.get_parent_mem_tracker(work_group1), manager.get_parent_mem_tracker(work_group3)); + const MemTrackerPtr parent_workgroup_1 = manager.register_workgroup(work_group1); + const MemTrackerPtr parent_workgroup_2 = manager.register_workgroup(work_group2); + const MemTrackerPtr parent_workgroup_3 = manager.register_workgroup(work_group3); + + ASSERT_EQ(parent_workgroup_1, parent_workgroup_2); + ASSERT_NE(parent_workgroup_1, parent_workgroup_3); } PARALLEL_TEST(MemTrackerMangerTest, test_mem_tracker_for_custom_mem_pool_overwrite) { MemTrackerManager manager; @@ -48,6 +52,9 @@ PARALLEL_TEST(MemTrackerMangerTest, test_mem_tracker_for_custom_mem_pool_overwri const auto work_group2{std::make_shared("wg_2", 134, WorkGroup::DEFAULT_VERSION, 1, 0.7, 0, 0.9, WorkGroupType::WG_DEFAULT, "test_pool")}; - ASSERT_NE(manager.get_parent_mem_tracker(work_group1), manager.get_parent_mem_tracker(work_group2)); + const MemTrackerPtr parent_workgroup1 = manager.register_workgroup(work_group1); + const MemTrackerPtr parent_workgroup2 = manager.register_workgroup(work_group2); + + ASSERT_NE(parent_workgroup1, parent_workgroup2); } } // namespace starrocks::workgroup diff --git a/be/test/exec/workgroup/work_group_manager_test.cpp b/be/test/exec/workgroup/work_group_manager_test.cpp index 9d0df8aae67216..4154130f71d977 100644 --- a/be/test/exec/workgroup/work_group_manager_test.cpp +++ b/be/test/exec/workgroup/work_group_manager_test.cpp @@ -30,6 +30,13 @@ TWorkGroup create_twg(const int64_t id, const int64_t version, const std::string return twg; } +TWorkGroupOp make_twg_op(const TWorkGroup& twg, const TWorkGroupOpType::type op_type) { + TWorkGroupOp op; + op.__set_workgroup(twg); + op.__set_op_type(op_type); + return op; +} + PARALLEL_TEST(WorkGroupManagerTest, add_workgroups_different_mem_pools) { PipelineExecutorSetConfig config{10, 1, 1, 1, CpuUtil::CpuIds{}, false, false, nullptr}; auto _manager = std::make_unique(config); @@ -82,4 +89,40 @@ PARALLEL_TEST(WorkGroupManagerTest, add_workgroups_same_mem_pools) { } _manager->destroy(); } + +PARALLEL_TEST(WorkGroupManagerTest, test_if_unused_memory_pools_are_cleaned_up) { + PipelineExecutorSetConfig config{10, 1, 1, 1, CpuUtil::CpuIds{}, false, false, nullptr}; + auto _manager = std::make_unique(config); + _manager->set_workgroup_expiration_time(std::chrono::seconds(0)); + { + auto twg1 = create_twg(110, 1, "wg110", "test_pool_2", 0.5); + auto twg2 = create_twg(111, 1, "wg111", "test_pool_2", 0.5); + auto twg3 = create_twg(112, 1, "wg112", WorkGroup::DEFAULT_MEM_POOL, 0.5); + + std::vector create_operations{ + make_twg_op(twg1, TWorkGroupOpType::WORKGROUP_OP_CREATE), + make_twg_op(twg2, TWorkGroupOpType::WORKGROUP_OP_CREATE), + make_twg_op(twg3, TWorkGroupOpType::WORKGROUP_OP_CREATE), + }; + + _manager->apply(create_operations); + + EXPECT_EQ(_manager->list_memory_pools().size(), 2); + + // Version must be strictly larger, otherwise workgroup will not be deleted + twg1.version++; + twg2.version++; + + std::vector delete_operations{make_twg_op(twg1, TWorkGroupOpType::WORKGROUP_OP_DELETE), + make_twg_op(twg2, TWorkGroupOpType::WORKGROUP_OP_DELETE)}; + + _manager->apply(delete_operations); + std::this_thread::sleep_for(std::chrono::seconds(1)); + // The expired workgroups will only get erased in the next call to apply + _manager->apply({}); + + EXPECT_EQ(_manager->list_memory_pools().size(), 1); + } + _manager->destroy(); +} } // namespace starrocks::workgroup