Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ set(EXEC_FILES
workgroup/pipeline_executor_set.cpp
workgroup/pipeline_executor_set_manager.cpp
workgroup/work_group.cpp
workgroup/mem_tracker_manager.cpp
workgroup/scan_executor.cpp
workgroup/scan_task_queue.cpp
query_cache/multilane_operator.cpp
Expand Down
54 changes: 54 additions & 0 deletions be/src/exec/workgroup/mem_tracker_manager.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "mem_tracker_manager.h"

#include <memory>

#include "runtime/exec_env.h"
#include "runtime/mem_tracker.h"
#include "work_group.h"

namespace starrocks::workgroup {
MemTrackerManager::MemTrackerPtr MemTrackerManager::get_parent_mem_tracker(const WorkGroupPtr& wg) {
if (WorkGroup::DEFAULT_MEM_POOL == wg->mem_pool()) {
return GlobalEnv::GetInstance()->query_pool_mem_tracker_shared();
}

const double mem_limit_fraction = wg->mem_limit();
const int64_t memory_limit_bytes =
static_cast<int64_t>(GlobalEnv::GetInstance()->query_pool_mem_tracker()->limit() * mem_limit_fraction);

// Frontend (FE) validation ensures that active resource groups (RGs) sharing
// the same mem_pool also have the same mem_limit.
if (_shared_mem_trackers.contains(wg->mem_pool())) {
// We must handle an edge case:
// 1. All RGs using a specific mem_pool are deleted.
// 2. The shared tracker for that pool remains cached here.
// 3. A new RG is created with the same mem_pool name but a different mem_limit.
// Therefore, we must verify the cached tracker's limit matches the current RG's limit.
if (auto& shared_mem_tracker = _shared_mem_trackers.at(wg->mem_pool());
shared_mem_tracker->limit() == memory_limit_bytes) {
return shared_mem_tracker;
}
}

auto shared_mem_tracker =
std::make_shared<MemTracker>(MemTrackerType::RESOURCE_GROUP_SHARED_MEMORY_POOL, memory_limit_bytes,
wg->mem_pool(), GlobalEnv::GetInstance()->query_pool_mem_tracker());

_shared_mem_trackers.insert_or_assign(wg->mem_pool(), shared_mem_tracker);
return shared_mem_tracker;
}
} // namespace starrocks::workgroup
31 changes: 31 additions & 0 deletions be/src/exec/workgroup/mem_tracker_manager.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <memory>

#include "runtime/mem_tracker.h"
#include "work_group_fwd.h"

namespace starrocks::workgroup {
struct MemTrackerManager {
public:
using MemTrackerPtr = std::shared_ptr<MemTracker>;
MemTrackerPtr get_parent_mem_tracker(const WorkGroupPtr& wg);

private:
std::unordered_map<std::string, MemTrackerPtr> _shared_mem_trackers{};
};
} // namespace starrocks::workgroup
38 changes: 27 additions & 11 deletions be/src/exec/workgroup/work_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@

#include "common/config.h"
#include "exec/pipeline/pipeline_driver_executor.h"
#include "exec/workgroup/mem_tracker_manager.h"
#include "exec/workgroup/pipeline_executor_set.h"
#include "exec/workgroup/scan_task_queue.h"
#include "glog/logging.h"
#include "runtime/exec_env.h"
#include "runtime/mem_tracker.h"
#include "util/cpu_info.h"
#include "util/metrics.h"
#include "util/starrocks_metrics.h"
Expand Down Expand Up @@ -96,7 +98,7 @@ RunningQueryToken::~RunningQueryToken() {
}

WorkGroup::WorkGroup(std::string name, int64_t id, int64_t version, size_t cpu_limit, double memory_limit,
size_t concurrency, double spill_mem_limit_threshold, WorkGroupType type)
size_t concurrency, double spill_mem_limit_threshold, WorkGroupType type, std::string mem_pool)
: _name(std::move(name)),
_id(id),
_version(version),
Expand All @@ -105,6 +107,7 @@ WorkGroup::WorkGroup(std::string name, int64_t id, int64_t version, size_t cpu_l
_memory_limit(memory_limit),
_concurrency_limit(concurrency),
_spill_mem_limit_threshold(spill_mem_limit_threshold),
_mem_pool(std::move(mem_pool)),
_driver_sched_entity(this),
_scan_sched_entity(this),
_connector_scan_sched_entity(this) {}
Expand Down Expand Up @@ -154,6 +157,11 @@ WorkGroup::WorkGroup(const TWorkGroup& twg)
if (twg.__isset.spill_mem_limit_threshold) {
_spill_mem_limit_threshold = twg.spill_mem_limit_threshold;
}
if (twg.__isset.mem_pool) {
_mem_pool = twg.mem_pool;
} else {
_mem_pool = DEFAULT_MEM_POOL;
}
}

TWorkGroup WorkGroup::to_thrift() const {
Expand Down Expand Up @@ -182,13 +190,20 @@ TWorkGroup WorkGroup::to_thrift_verbose() const {
return twg;
}

void WorkGroup::init() {
_memory_limit_bytes = _memory_limit == ABSENT_MEMORY_LIMIT
? GlobalEnv::GetInstance()->query_pool_mem_tracker()->limit()
: GlobalEnv::GetInstance()->query_pool_mem_tracker()->limit() * _memory_limit;
void WorkGroup::init(std::shared_ptr<MemTracker>& parent_mem_tracker) {
if (parent_mem_tracker->type() == MemTrackerType::RESOURCE_GROUP_SHARED_MEMORY_POOL) {
_memory_limit_bytes = parent_mem_tracker->limit();
_shared_mem_tracker = parent_mem_tracker;
} else {
_memory_limit_bytes = _memory_limit == ABSENT_MEMORY_LIMIT ? parent_mem_tracker->limit()
: parent_mem_tracker->limit() * _memory_limit;
}

_spill_mem_limit_bytes = _spill_mem_limit_threshold * _memory_limit_bytes;

//todo (m.bogusz) MemTracker can only handle raw ptr parent so we need to add parent_mem_tracker as member to workgroup
_mem_tracker = std::make_shared<MemTracker>(MemTrackerType::RESOURCE_GROUP, _memory_limit_bytes, _name,
GlobalEnv::GetInstance()->query_pool_mem_tracker());
parent_mem_tracker.get());
_mem_tracker->set_reserve_limit(_spill_mem_limit_bytes);

_driver_sched_entity.set_queue(std::make_unique<pipeline::QuerySharedDriverQueue>(
Expand Down Expand Up @@ -275,7 +290,7 @@ void WorkGroup::copy_metrics(const WorkGroup& rhs) {
// ------------------------------------------------------------------------------------

WorkGroupManager::WorkGroupManager(PipelineExecutorSetConfig executors_manager_conf)
: _executors_manager(this, std::move(executors_manager_conf)) {}
: _executors_manager(this, std::move(executors_manager_conf)), _shared_mem_tracker_manager{} {}

WorkGroupManager::~WorkGroupManager() = default;

Expand Down Expand Up @@ -549,7 +564,8 @@ void WorkGroupManager::create_workgroup_unlocked(const WorkGroupPtr& wg, UniqueL
return;
}

wg->init();
auto parent_mem_tracker = _shared_mem_tracker_manager.get_parent_mem_tracker(wg);
wg->init(parent_mem_tracker);
_workgroups[unique_id] = wg;

_sum_cpu_weight += wg->cpu_weight();
Expand Down Expand Up @@ -690,7 +706,8 @@ std::shared_ptr<WorkGroup> DefaultWorkGroupInitialization::create_default_workgr
const double memory_limit = 1.0;
const double spill_mem_limit_threshold = 1.0; // not enable spill mem limit threshold
return std::make_shared<WorkGroup>("default_wg", WorkGroup::DEFAULT_WG_ID, WorkGroup::DEFAULT_VERSION, cpu_limit,
memory_limit, 0, spill_mem_limit_threshold, WorkGroupType::WG_DEFAULT);
memory_limit, 0, spill_mem_limit_threshold, WorkGroupType::WG_DEFAULT,
WorkGroup::DEFAULT_MEM_POOL);
}

std::shared_ptr<WorkGroup> DefaultWorkGroupInitialization::create_default_mv_workgroup() {
Expand All @@ -700,7 +717,6 @@ std::shared_ptr<WorkGroup> DefaultWorkGroupInitialization::create_default_mv_wor
double mv_spill_mem_limit_threshold = config::default_mv_resource_group_spill_mem_limit_threshold;
return std::make_shared<WorkGroup>("default_mv_wg", WorkGroup::DEFAULT_MV_WG_ID, WorkGroup::DEFAULT_MV_VERSION,
mv_cpu_limit, mv_memory_limit, mv_concurrency_limit,
mv_spill_mem_limit_threshold, WorkGroupType::WG_MV);
mv_spill_mem_limit_threshold, WorkGroupType::WG_MV, WorkGroup::DEFAULT_MEM_POOL);
}

} // namespace starrocks::workgroup
15 changes: 10 additions & 5 deletions be/src/exec/workgroup/work_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "exec/pipeline/pipeline_driver_queue.h"
#include "exec/pipeline/query_context.h"
#include "exec/workgroup/work_group_fwd.h"
#include "mem_tracker_manager.h"
#include "pipeline_executor_set_manager.h"
#include "runtime/mem_tracker.h"
#include "storage/olap_define.h"
Expand Down Expand Up @@ -114,11 +115,11 @@ using RunningQueryTokenPtr = std::unique_ptr<RunningQueryToken>;
class WorkGroup : public std::enable_shared_from_this<WorkGroup> {
public:
WorkGroup(std::string name, int64_t id, int64_t version, size_t cpu_weight, double memory_limit, size_t concurrency,
double spill_mem_limit_threshold, WorkGroupType type);
double spill_mem_limit_threshold, WorkGroupType type, std::string mem_pool);
explicit WorkGroup(const TWorkGroup& twg);
~WorkGroup() = default;

void init();
void init(std::shared_ptr<MemTracker>& parent_mem_tracker);

TWorkGroup to_thrift() const;
TWorkGroup to_thrift_verbose() const;
Expand All @@ -128,6 +129,7 @@ class WorkGroup : public std::enable_shared_from_this<WorkGroup> {
void copy_metrics(const WorkGroup& rhs);

MemTracker* mem_tracker() { return _mem_tracker.get(); }

std::shared_ptr<MemTracker> grab_mem_tracker() { return _mem_tracker; }
const MemTracker* mem_tracker() const { return _mem_tracker.get(); }
MemTracker* connector_scan_mem_tracker() { return _connector_scan_mem_tracker.get(); }
Expand All @@ -137,7 +139,7 @@ class WorkGroup : public std::enable_shared_from_this<WorkGroup> {
const std::string& name() const { return _name; }
size_t cpu_weight() const { return _cpu_weight; }
size_t exclusive_cpu_cores() const { return _exclusive_cpu_cores; }
size_t mem_limit() const { return _memory_limit; }
double mem_limit() const { return _memory_limit; }
int64_t mem_limit_bytes() const { return _memory_limit_bytes; }

int64_t mem_consumption_bytes() const { return _mem_tracker == nullptr ? 0L : _mem_tracker->consumption(); }
Expand Down Expand Up @@ -198,7 +200,7 @@ class WorkGroup : public std::enable_shared_from_this<WorkGroup> {
int64_t big_query_scan_rows_limit() const { return _big_query_scan_rows_limit; }
void incr_cpu_runtime_ns(int64_t delta_ns) { _cpu_runtime_ns += delta_ns; }
int64_t cpu_runtime_ns() const { return _cpu_runtime_ns; }

std::string mem_pool() const { return _mem_pool; }
void set_shared_executors(PipelineExecutorSet* executors) { _executors = executors; }
void set_exclusive_executors(std::unique_ptr<PipelineExecutorSet> executors) {
_exclusive_executors = std::move(executors);
Expand All @@ -212,6 +214,7 @@ class WorkGroup : public std::enable_shared_from_this<WorkGroup> {
static constexpr int64 DEFAULT_MV_WG_ID = 1;
static constexpr int64 DEFAULT_VERSION = 0;
static constexpr int64 DEFAULT_MV_VERSION = 1;
inline static std::string DEFAULT_MEM_POOL{"default_mem_pool"};

// Yield scan io task when maximum time in nano-seconds has spent in current execution round.
static constexpr int64_t YIELD_MAX_TIME_SPENT = 100'000'000L;
Expand Down Expand Up @@ -240,7 +243,9 @@ class WorkGroup : public std::enable_shared_from_this<WorkGroup> {
double _spill_mem_limit_threshold = 1.0;
int64_t _spill_mem_limit_bytes = -1;

std::string _mem_pool;
std::shared_ptr<MemTracker> _mem_tracker = nullptr;
std::shared_ptr<MemTracker> _shared_mem_tracker = nullptr;
std::shared_ptr<MemTracker> _connector_scan_mem_tracker = nullptr;

WorkGroupDriverSchedEntity _driver_sched_entity;
Expand Down Expand Up @@ -330,7 +335,7 @@ class WorkGroupManager {
std::list<int128_t> _workgroup_expired_versions;

std::atomic<size_t> _sum_cpu_weight = 0;

MemTrackerManager _shared_mem_tracker_manager;
std::once_flag init_metrics_once_flag;
std::unordered_map<std::string, WorkGroupMetricsPtr> _wg_metrics;
};
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ class GlobalEnv {

MemTracker* process_mem_tracker() { return _process_mem_tracker.get(); }
MemTracker* query_pool_mem_tracker() { return _query_pool_mem_tracker.get(); }
std::shared_ptr<MemTracker> query_pool_mem_tracker_shared() { return _query_pool_mem_tracker; }
MemTracker* connector_scan_pool_mem_tracker() { return _connector_scan_pool_mem_tracker.get(); }
MemTracker* load_mem_tracker() { return _load_mem_tracker.get(); }
MemTracker* metadata_mem_tracker() { return _metadata_mem_tracker.get(); }
Expand Down
3 changes: 3 additions & 0 deletions be/src/runtime/mem_tracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,9 @@ std::string MemTracker::err_msg(const std::string& msg, RuntimeState* state) con
<< "You can change the limit by modifying [mem_limit] of this group";
}
break;
case MemTrackerType::RESOURCE_GROUP_SHARED_MEMORY_POOL:
str << "Mem usage has exceed the limit of resource group memory pool [" << label() << "]. ";
break;
case MemTrackerType::RESOURCE_GROUP_BIG_QUERY:
str << "Mem usage has exceed the big query limit of the resource group [" << label() << "]. "
<< "You can change the limit by modifying [big_query_mem_limit] of this group";
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/mem_tracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ enum class MemTrackerType {
SCHEMA_CHANGE_TASK,
SCHEMA_CHANGE,
RESOURCE_GROUP,
RESOURCE_GROUP_SHARED_MEMORY_POOL,
RESOURCE_GROUP_BIG_QUERY,
JEMALLOC,
PASSTHROUGH,
Expand Down
2 changes: 2 additions & 0 deletions be/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ set(EXEC_FILES
./exec/paimon/paimon_delete_file_builder_test.cpp
./exec/workgroup/scan_task_queue_test.cpp
./exec/workgroup/pipeline_executor_set_test.cpp
./exec/workgroup/work_group_manager_test.cpp
./exec/workgroup/mem_tracker_manager_test.cpp
./exec/pipeline/schedule/observer_test.cpp
./exec/pipeline/pipeline_control_flow_test.cpp
./exec/pipeline/pipeline_driver_queue_test.cpp
Expand Down
10 changes: 5 additions & 5 deletions be/test/exec/pipeline/mem_limited_chunk_queue_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ class MemLimitedChunkQueueTest : public ::testing::Test {
auto fs = FileSystem::Default();
ASSERT_OK(fs->create_dir_recursive(path));
LOG(INFO) << "path: " << path;

dummy_wg = std::make_shared<workgroup::WorkGroup>("default_wg", workgroup::WorkGroup::DEFAULT_WG_ID,
workgroup::WorkGroup::DEFAULT_VERSION, 4, 100.0, 0, 1.0,
workgroup::WorkGroupType::WG_DEFAULT);
dummy_wg->init();
auto parent = GlobalEnv::GetInstance()->query_pool_mem_tracker_shared();
dummy_wg = std::make_shared<workgroup::WorkGroup>(
"default_wg", workgroup::WorkGroup::DEFAULT_WG_ID, workgroup::WorkGroup::DEFAULT_VERSION, 4, 100.0, 0,
1.0, workgroup::WorkGroupType::WG_DEFAULT, workgroup::WorkGroup::DEFAULT_MEM_POOL);
dummy_wg->init(parent);
dummy_wg->set_shared_executors(ExecEnv::GetInstance()->workgroup_manager()->shared_executors());

dummy_dir_mgr = std::make_unique<spill::DirManager>();
Expand Down
12 changes: 8 additions & 4 deletions be/test/exec/pipeline/pipeline_driver_queue_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,13 +174,17 @@ class WorkGroupDriverQueueTest : public ::testing::Test {
public:
void SetUp() override {
_wg1 = std::make_shared<workgroup::WorkGroup>("wg100", 100, workgroup::WorkGroup::DEFAULT_VERSION, 1, 0.5, 10,
1.0, workgroup::WorkGroupType::WG_NORMAL);
1.0, workgroup::WorkGroupType::WG_NORMAL,
workgroup::WorkGroup::DEFAULT_MEM_POOL);
_wg2 = std::make_shared<workgroup::WorkGroup>("wg200", 200, workgroup::WorkGroup::DEFAULT_VERSION, 2, 0.5, 10,
1.0, workgroup::WorkGroupType::WG_NORMAL);
1.0, workgroup::WorkGroupType::WG_NORMAL,
workgroup::WorkGroup::DEFAULT_MEM_POOL);
_wg3 = std::make_shared<workgroup::WorkGroup>("wg300", 300, workgroup::WorkGroup::DEFAULT_VERSION, 1, 0.5, 10,
1.0, workgroup::WorkGroupType::WG_NORMAL);
1.0, workgroup::WorkGroupType::WG_NORMAL,
workgroup::WorkGroup::DEFAULT_MEM_POOL);
_wg4 = std::make_shared<workgroup::WorkGroup>("wg400", 400, workgroup::WorkGroup::DEFAULT_VERSION, 1, 0.5, 10,
1.0, workgroup::WorkGroupType::WG_NORMAL);
1.0, workgroup::WorkGroupType::WG_NORMAL,
workgroup::WorkGroup::DEFAULT_MEM_POOL);
_wg1 = ExecEnv::GetInstance()->workgroup_manager()->add_workgroup(_wg1);
_wg2 = ExecEnv::GetInstance()->workgroup_manager()->add_workgroup(_wg2);
_wg3 = ExecEnv::GetInstance()->workgroup_manager()->add_workgroup(_wg3);
Expand Down
6 changes: 3 additions & 3 deletions be/test/exec/pipeline/query_context_manger_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,9 +277,9 @@ TEST(QueryContextManagerTest, testSetWorkgroup) {
auto query_ctx_mgr = std::make_shared<QueryContextManager>(6);
ASSERT_TRUE(query_ctx_mgr->init().ok());

workgroup::WorkGroupPtr wg = std::make_shared<workgroup::WorkGroup>("wg1", 1, 1, 1, 1, 1 /* concurrency_limit */,
1.0 /* spill_mem_limit_threshold */,
workgroup::WorkGroupType::WG_NORMAL);
workgroup::WorkGroupPtr wg = std::make_shared<workgroup::WorkGroup>(
"wg1", 1, 1, 1, 1, 1 /* concurrency_limit */, 1.0 /* spill_mem_limit_threshold */,
workgroup::WorkGroupType::WG_NORMAL, workgroup::WorkGroup::DEFAULT_MEM_POOL);

auto* query_ctx1 = gen_query_ctx(parent_mem_tracker.get(), query_ctx_mgr.get(), 0, 1, 3, 60, 300);
auto* query_ctx_overloaded = gen_query_ctx(parent_mem_tracker.get(), query_ctx_mgr.get(), 1, 2, 3, 60, 300);
Expand Down
Loading
Loading