Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 38 additions & 6 deletions be/src/exec/workgroup/mem_tracker_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
#include "work_group.h"

namespace starrocks::workgroup {
MemTrackerManager::MemTrackerPtr MemTrackerManager::get_parent_mem_tracker(const WorkGroupPtr& wg) {

MemTrackerPtr MemTrackerManager::register_workgroup(const WorkGroupPtr& wg) {
if (WorkGroup::DEFAULT_MEM_POOL == wg->mem_pool()) {
return GlobalEnv::GetInstance()->query_pool_mem_tracker_shared();
}
Expand All @@ -34,21 +35,52 @@ MemTrackerManager::MemTrackerPtr MemTrackerManager::get_parent_mem_tracker(const
// the same mem_pool also have the same mem_limit.
if (_shared_mem_trackers.contains(wg->mem_pool())) {
// We must handle an edge case:
// 1. All RGs using a specific mem_pool are deleted.
// 1. All RGs using a specific mem_pool are marked for deletion.
// 2. The shared tracker for that pool remains cached here.
// 3. A new RG is created with the same mem_pool name but a different mem_limit.
// Therefore, we must verify the cached tracker's limit matches the current RG's limit.
if (auto& shared_mem_tracker = _shared_mem_trackers.at(wg->mem_pool());
shared_mem_tracker->limit() == memory_limit_bytes) {
return shared_mem_tracker;
// Otherwise, a new mem_pool with thew new mem_limit will be constructed, and expiring children are orphaned.
if (MemTrackerInfo& tracker_info = _shared_mem_trackers.at(wg->mem_pool());
tracker_info.tracker->limit() == memory_limit_bytes) {
tracker_info.child_count++;
return tracker_info.tracker;
}
}

auto shared_mem_tracker =
std::make_shared<MemTracker>(MemTrackerType::RESOURCE_GROUP_SHARED_MEMORY_POOL, memory_limit_bytes,
wg->mem_pool(), GlobalEnv::GetInstance()->query_pool_mem_tracker());

_shared_mem_trackers.insert_or_assign(wg->mem_pool(), shared_mem_tracker);
_shared_mem_trackers[wg->mem_pool()].tracker = shared_mem_tracker;
_shared_mem_trackers[wg->mem_pool()].child_count++; // also handles orphaned children

return shared_mem_tracker;
}

void MemTrackerManager::deregister_workgroup(const std::string& mem_pool) {
if (WorkGroup::DEFAULT_MEM_POOL == mem_pool) {
return;
}

if (_shared_mem_trackers.contains(mem_pool)) {
MemTrackerInfo& tracker_info = _shared_mem_trackers.at(mem_pool);
if (tracker_info.child_count == 1) {
// The shared tracker will be erased if its last child is being deregistered
_shared_mem_trackers.erase(mem_pool);
} else {
DCHECK(tracker_info.child_count > 1);
tracker_info.child_count--;
}
}
}

std::vector<std::string> MemTrackerManager::list_mem_trackers() const {
std::vector<std::string> mem_trackers{};
mem_trackers.reserve(_shared_mem_trackers.size() + 1);
for (const auto& mem_tracker : _shared_mem_trackers) {
mem_trackers.push_back(mem_tracker.first);
}
mem_trackers.push_back(WorkGroup::DEFAULT_MEM_POOL);
return mem_trackers;
}
} // namespace starrocks::workgroup
26 changes: 23 additions & 3 deletions be/src/exec/workgroup/mem_tracker_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,32 @@
#include "work_group_fwd.h"

namespace starrocks::workgroup {

using MemTrackerPtr = std::shared_ptr<MemTracker>;

struct MemTrackerInfo {
MemTrackerPtr tracker;
uint32_t child_count;
};

struct MemTrackerManager {
public:
using MemTrackerPtr = std::shared_ptr<MemTracker>;
MemTrackerPtr get_parent_mem_tracker(const WorkGroupPtr& wg);
std::vector<std::string> list_mem_trackers() const;
/**
* Constructs and returns a shared_mem_tracker for the workgroup if one does not already exist.
* Otherwise, returns the existing instance and increments the number of tracked workgroups by one.
* This method must be called whenever a new workgroup is constructed.
*/
[[nodiscard]] MemTrackerPtr register_workgroup(const WorkGroupPtr& wg);

/**
* Decrements the number of tracked workgroups of the given memory pool by one.
* If all its tracked children are deregistered, the memory pool entry will be erased.
* This method must be called whenever an existing workgroup is destructed.
*/
void deregister_workgroup(const std::string& mem_pool);

private:
std::unordered_map<std::string, MemTrackerPtr> _shared_mem_trackers{};
std::unordered_map<std::string, MemTrackerInfo> _shared_mem_trackers{};
};
} // namespace starrocks::workgroup
22 changes: 17 additions & 5 deletions be/src/exec/workgroup/work_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -525,10 +525,12 @@ void WorkGroupManager::apply(const std::vector<TWorkGroupOp>& ops) {
while (it != _workgroup_expired_versions.end()) {
auto wg_it = _workgroups.find(*it);
if (wg_it != _workgroups.end() && wg_it->second->is_removable()) {
auto id = wg_it->second->id();
auto version = wg_it->second->version();
const auto id = wg_it->second->id();
const auto version = wg_it->second->version();
const auto mem_pool = wg_it->second->mem_pool();
_sum_cpu_weight -= wg_it->second->cpu_weight();
_workgroups.erase(wg_it);
_shared_mem_tracker_manager.deregister_workgroup(mem_pool);
auto version_it = _workgroup_versions.find(id);
if (version_it != _workgroup_versions.end() && version_it->second <= version) {
_workgroup_versions.erase(version_it);
Expand Down Expand Up @@ -564,7 +566,7 @@ void WorkGroupManager::create_workgroup_unlocked(const WorkGroupPtr& wg, UniqueL
return;
}

auto parent_mem_tracker = _shared_mem_tracker_manager.get_parent_mem_tracker(wg);
auto parent_mem_tracker = _shared_mem_tracker_manager.register_workgroup(wg);
wg->init(parent_mem_tracker);
_workgroups[unique_id] = wg;

Expand All @@ -579,7 +581,7 @@ void WorkGroupManager::create_workgroup_unlocked(const WorkGroupPtr& wg, UniqueL
auto& old_wg = _workgroups[old_unique_id];

_executors_manager.reclaim_cpuids_from_worgroup(old_wg.get());
old_wg->mark_del();
old_wg->mark_del(_workgroup_expiration_time);
_workgroup_expired_versions.push_back(old_unique_id);
LOG(INFO) << "workgroup expired version: " << wg->name() << "(" << wg->id() << "," << stale_version << ")";

Expand Down Expand Up @@ -633,7 +635,7 @@ void WorkGroupManager::delete_workgroup_unlocked(const WorkGroupPtr& wg) {
if (wg_it != _workgroups.end()) {
const auto& old_wg = wg_it->second;
_executors_manager.reclaim_cpuids_from_worgroup(old_wg.get());
old_wg->mark_del();
old_wg->mark_del(_workgroup_expiration_time);
_executors_manager.update_shared_executors();
_workgroup_expired_versions.push_back(unique_id);
LOG(INFO) << "workgroup expired version: " << wg->name() << "(" << wg->id() << "," << curr_version << ")";
Expand All @@ -652,6 +654,11 @@ std::vector<TWorkGroup> WorkGroupManager::list_workgroups() {
return alive_workgroups;
}

std::vector<std::string> WorkGroupManager::list_memory_pools() const {
std::shared_lock read_lock(_mutex);
return _shared_mem_tracker_manager.list_mem_trackers();
}

void WorkGroupManager::for_each_workgroup(const WorkGroupConsumer& consumer) const {
std::shared_lock read_lock(_mutex);
for (const auto& [_, wg] : _workgroups) {
Expand Down Expand Up @@ -687,6 +694,11 @@ void WorkGroupManager::change_enable_resource_group_cpu_borrowing(const bool val
_executors_manager.change_enable_resource_group_cpu_borrowing(val);
}

void WorkGroupManager::set_workgroup_expiration_time(const std::chrono::seconds value) {
std::unique_lock write_lock(_mutex);
_workgroup_expiration_time = value;
}

// ------------------------------------------------------------------------------------
// DefaultWorkGroupInitialization
// ------------------------------------------------------------------------------------
Expand Down
19 changes: 11 additions & 8 deletions be/src/exec/workgroup/work_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ class TWorkGroup;

namespace workgroup {

using seconds = std::chrono::seconds;
using milliseconds = std::chrono::microseconds;
using vacuum_time_precision = std::chrono::microseconds;
using steady_clock = std::chrono::steady_clock;
using std::chrono::duration_cast;

Expand Down Expand Up @@ -158,25 +157,25 @@ class WorkGroup : public std::enable_shared_from_this<WorkGroup> {
// mark the workgroup is deleted, but at the present, it can not be removed from WorkGroupManager, because
// 1. there exists pending drivers
// 2. there is a race condition that a driver is attached to the workgroup after it is marked del.
void mark_del() {
void mark_del(const std::chrono::seconds expiration_time) {
bool expect_false = false;
if (_is_marked_del.compare_exchange_strong(expect_false, true)) {
static constexpr seconds expire_seconds{120};
_vacuum_ttl = duration_cast<milliseconds>(steady_clock::now().time_since_epoch() + expire_seconds).count();
_vacuum_ttl = duration_cast<vacuum_time_precision>(steady_clock::now().time_since_epoch() + expiration_time)
.count();
}
}
// no drivers shall be added to this workgroup
bool is_marked_del() const { return _is_marked_del.load(std::memory_order_acquire); }
// a workgroup should wait several seconds to be cleaned safely.
bool is_expired() const {
auto now = duration_cast<milliseconds>(steady_clock::now().time_since_epoch()).count();
const auto now = duration_cast<vacuum_time_precision>(steady_clock::now().time_since_epoch()).count();
return now > _vacuum_ttl;
}

// return true if current workgroup is removable:
// 1. is already marked del
// 2. no pending drivers exists
// 3. wait for a period of vacuum_ttl to prevent race condition
// 3. wait until vacuum_ttl to prevent race condition
bool is_removable() const {
return is_marked_del() && _num_running_drivers.load(std::memory_order_acquire) == 0 && is_expired();
}
Expand Down Expand Up @@ -244,8 +243,8 @@ class WorkGroup : public std::enable_shared_from_this<WorkGroup> {
int64_t _spill_mem_limit_bytes = -1;

std::string _mem_pool;
std::shared_ptr<MemTracker> _mem_tracker = nullptr;
std::shared_ptr<MemTracker> _shared_mem_tracker = nullptr;
std::shared_ptr<MemTracker> _mem_tracker = nullptr;
std::shared_ptr<MemTracker> _connector_scan_mem_tracker = nullptr;

WorkGroupDriverSchedEntity _driver_sched_entity;
Expand All @@ -256,6 +255,7 @@ class WorkGroup : public std::enable_shared_from_this<WorkGroup> {

std::atomic<size_t> _num_running_drivers = 0;
std::atomic<size_t> _acc_num_drivers = 0;
// vacuum_ttl is set to max, as a data race might cause a thread to read `is_marked_del = true` and `vacuum_ttl = 0`
int64_t _vacuum_ttl = std::numeric_limits<int64_t>::max();

// Metrics of this workgroup
Expand Down Expand Up @@ -296,6 +296,7 @@ class WorkGroupManager {

void apply(const std::vector<TWorkGroupOp>& ops);
std::vector<TWorkGroup> list_workgroups();
std::vector<std::string> list_memory_pools() const;

using WorkGroupConsumer = std::function<void(const WorkGroup&)>;
void for_each_workgroup(const WorkGroupConsumer& consumer) const;
Expand All @@ -307,6 +308,7 @@ class WorkGroupManager {
void for_each_executors(const ExecutorsManager::ExecutorsConsumer& consumer) const;
void change_num_connector_scan_threads(uint32_t num_connector_scan_threads);
void change_enable_resource_group_cpu_borrowing(bool val);
void set_workgroup_expiration_time(std::chrono::seconds value);

private:
using MutexType = std::shared_mutex;
Expand All @@ -333,6 +335,7 @@ class WorkGroupManager {
std::unordered_map<int128_t, WorkGroupPtr> _workgroups;
std::unordered_map<int64_t, int64_t> _workgroup_versions;
std::list<int128_t> _workgroup_expired_versions;
std::chrono::seconds _workgroup_expiration_time{120};

std::atomic<size_t> _sum_cpu_weight = 0;
MemTrackerManager _shared_mem_tracker_manager;
Expand Down
15 changes: 11 additions & 4 deletions be/test/exec/workgroup/mem_tracker_manager_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ PARALLEL_TEST(MemTrackerMangerTest, test_mem_tracker_for_default_mem_pool) {
const auto work_group{std::make_shared<WorkGroup>("default_wg", 123, WorkGroup::DEFAULT_VERSION, 1, 0.5, 0, 0.9,
WorkGroupType::WG_DEFAULT, WorkGroup::DEFAULT_MEM_POOL)};

const auto tracker{manager.get_parent_mem_tracker(work_group)};
const auto tracker{manager.register_workgroup(work_group)};
ASSERT_EQ(tracker, GlobalEnv::GetInstance()->query_pool_mem_tracker_shared());
}

Expand All @@ -38,8 +38,12 @@ PARALLEL_TEST(MemTrackerMangerTest, test_mem_tracker_for_custom_mem_pool) {
const auto work_group3{std::make_shared<WorkGroup>("wg_2", 134, WorkGroup::DEFAULT_VERSION, 1, 0.5, 0, 0.9,
WorkGroupType::WG_DEFAULT, "other_pool")};

ASSERT_EQ(manager.get_parent_mem_tracker(work_group1), manager.get_parent_mem_tracker(work_group2));
ASSERT_NE(manager.get_parent_mem_tracker(work_group1), manager.get_parent_mem_tracker(work_group3));
const MemTrackerPtr parent_workgroup_1 = manager.register_workgroup(work_group1);
const MemTrackerPtr parent_workgroup_2 = manager.register_workgroup(work_group2);
const MemTrackerPtr parent_workgroup_3 = manager.register_workgroup(work_group3);

ASSERT_EQ(parent_workgroup_1, parent_workgroup_2);
ASSERT_NE(parent_workgroup_1, parent_workgroup_3);
}
PARALLEL_TEST(MemTrackerMangerTest, test_mem_tracker_for_custom_mem_pool_overwrite) {
MemTrackerManager manager;
Expand All @@ -48,6 +52,9 @@ PARALLEL_TEST(MemTrackerMangerTest, test_mem_tracker_for_custom_mem_pool_overwri
const auto work_group2{std::make_shared<WorkGroup>("wg_2", 134, WorkGroup::DEFAULT_VERSION, 1, 0.7, 0, 0.9,
WorkGroupType::WG_DEFAULT, "test_pool")};

ASSERT_NE(manager.get_parent_mem_tracker(work_group1), manager.get_parent_mem_tracker(work_group2));
const MemTrackerPtr parent_workgroup1 = manager.register_workgroup(work_group1);
const MemTrackerPtr parent_workgroup2 = manager.register_workgroup(work_group2);

ASSERT_NE(parent_workgroup1, parent_workgroup2);
}
} // namespace starrocks::workgroup
43 changes: 43 additions & 0 deletions be/test/exec/workgroup/work_group_manager_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ TWorkGroup create_twg(const int64_t id, const int64_t version, const std::string
return twg;
}

TWorkGroupOp make_twg_op(const TWorkGroup& twg, const TWorkGroupOpType::type op_type) {
TWorkGroupOp op;
op.__set_workgroup(twg);
op.__set_op_type(op_type);
return op;
}

PARALLEL_TEST(WorkGroupManagerTest, add_workgroups_different_mem_pools) {
PipelineExecutorSetConfig config{10, 1, 1, 1, CpuUtil::CpuIds{}, false, false, nullptr};
auto _manager = std::make_unique<WorkGroupManager>(config);
Expand Down Expand Up @@ -82,4 +89,40 @@ PARALLEL_TEST(WorkGroupManagerTest, add_workgroups_same_mem_pools) {
}
_manager->destroy();
}

PARALLEL_TEST(WorkGroupManagerTest, test_if_unused_memory_pools_are_cleaned_up) {
PipelineExecutorSetConfig config{10, 1, 1, 1, CpuUtil::CpuIds{}, false, false, nullptr};
auto _manager = std::make_unique<WorkGroupManager>(config);
_manager->set_workgroup_expiration_time(std::chrono::seconds(0));
{
auto twg1 = create_twg(110, 1, "wg110", "test_pool_2", 0.5);
auto twg2 = create_twg(111, 1, "wg111", "test_pool_2", 0.5);
auto twg3 = create_twg(112, 1, "wg112", WorkGroup::DEFAULT_MEM_POOL, 0.5);

std::vector create_operations{
make_twg_op(twg1, TWorkGroupOpType::WORKGROUP_OP_CREATE),
make_twg_op(twg2, TWorkGroupOpType::WORKGROUP_OP_CREATE),
make_twg_op(twg3, TWorkGroupOpType::WORKGROUP_OP_CREATE),
};

_manager->apply(create_operations);

EXPECT_EQ(_manager->list_memory_pools().size(), 2);

// Version must be strictly larger, otherwise workgroup will not be deleted
twg1.version++;
twg2.version++;

std::vector delete_operations{make_twg_op(twg1, TWorkGroupOpType::WORKGROUP_OP_DELETE),
make_twg_op(twg2, TWorkGroupOpType::WORKGROUP_OP_DELETE)};

_manager->apply(delete_operations);
std::this_thread::sleep_for(std::chrono::seconds(1));
// The expired workgroups will only get erased in the next call to apply
_manager->apply({});

EXPECT_EQ(_manager->list_memory_pools().size(), 1);
}
_manager->destroy();
}
} // namespace starrocks::workgroup
Loading