Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.

Proper heterogen execution modes #652

Merged
merged 3 commits into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ void QueryFragmentDescriptor::buildMultifragKernelMap(
if (device_type == ExecutorDeviceType::GPU) {
checkDeviceMemoryUsage(fragment, device_id, num_bytes_for_row);
}

for (size_t j = 0; j < ra_exe_unit.input_descs.size(); ++j) {
const auto db_id = ra_exe_unit.input_descs[j].getDatabaseId();
const auto table_id = ra_exe_unit.input_descs[j].getTableId();
Expand Down
50 changes: 50 additions & 0 deletions omniscidb/QueryEngine/Descriptors/QueryFragmentDescriptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,56 @@ class QueryFragmentDescriptor {
}
}

template <typename DISPATCH_FCN>
void assignFragsToMultiHeterogeneousDispatch(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having a separate method is a bit ugly (that was the reason we didn't merge it long time ago), but I think we can live with that for now.

DISPATCH_FCN dispatcher_f,
const RelAlgExecutionUnit& ra_exe_unit) const {
std::unordered_map<int, size_t> cpu_execution_kernel_index;
size_t tuple_count = 0;

if (execution_kernels_per_device_.count(ExecutorDeviceType::CPU)) {
cpu_execution_kernel_index.reserve(
execution_kernels_per_device_.at(ExecutorDeviceType::CPU).size());
for (const auto& device_itr :
execution_kernels_per_device_.at(ExecutorDeviceType::CPU)) {
CHECK(
cpu_execution_kernel_index.insert(std::make_pair(device_itr.first, size_t(0)))
.second);
}
}

for (const auto& device_type_itr : execution_kernels_per_device_) {
if (device_type_itr.first == ExecutorDeviceType::GPU) {
for (const auto& device_itr : device_type_itr.second) {
const auto& execution_kernels = device_itr.second;
CHECK_EQ(execution_kernels.size(), size_t(1));
const auto& fragments_list = execution_kernels.front().fragments;
dispatcher_f(
device_itr.first, fragments_list, rowid_lookup_key_, device_type_itr.first);
}
} else {
bool dispatch_finished = false;
while (!dispatch_finished) {
dispatch_finished = true;
for (const auto& device_itr : device_type_itr.second) {
auto& kernel_idx = cpu_execution_kernel_index[device_itr.first];
if (kernel_idx < device_itr.second.size()) {
dispatch_finished = false;
const auto& execution_kernel = device_itr.second[kernel_idx++];
dispatcher_f(device_itr.first,
execution_kernel.fragments,
rowid_lookup_key_,
device_type_itr.first);
if (terminateDispatchMaybe(tuple_count, ra_exe_unit, execution_kernel)) {
return;
}
}
}
}
}
}
}

/**
* Dispatch one fragment for each device. Iterate the device map and dispatch one kernel
* for each device per iteration. This allows balanced dispatch as well as early
Expand Down
141 changes: 92 additions & 49 deletions omniscidb/QueryEngine/Execute.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2790,66 +2790,109 @@ std::vector<std::unique_ptr<ExecutionKernel>> Executor::createHeterogeneousKerne

CHECK(!ra_exe_unit.input_descs.empty());

const bool use_multifrag_kernel = eo.allow_multifrag && is_agg;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This reads as group-bys would not use a multi-fragment policy. Needs cleanup I guess.


fragment_descriptor.buildFragmentKernelMap(ra_exe_unit,
shared_context.getFragOffsets(),
policy,
available_cpus + available_gpus.size(),
false, /*multifrag policy unsupported yet*/
use_multifrag_kernel,
this,
co.codegen_traits_desc);

if (allow_single_frag_table_opt && query_mem_descs.count(ExecutorDeviceType::GPU) &&
(query_mem_descs.at(ExecutorDeviceType::GPU)->getQueryDescriptionType() ==
QueryDescriptionType::Projection) &&
table_infos.size() == 1) {
const auto max_frag_size = table_infos.front().info.getFragmentNumTuplesUpperBound();
if (max_frag_size < query_mem_descs.at(ExecutorDeviceType::GPU)->getEntryCount()) {
LOG(INFO) << "Lowering scan limit from "
<< query_mem_descs.at(ExecutorDeviceType::GPU)->getEntryCount()
<< " to match max fragment size " << max_frag_size
<< " for kernel per fragment execution path.";
throw CompilationRetryNewScanLimit(max_frag_size);
if (use_multifrag_kernel) {
LOG(INFO) << "use_multifrag_kernel=" << use_multifrag_kernel;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is debug information. Same for others.

size_t frag_list_idx{0};
auto multifrag_heterogeneous_kernel_dispatch =
[&ra_exe_unit,
&execution_kernels,
&column_fetcher,
&co,
&eo,
&frag_list_idx,
&query_comp_descs,
&query_mem_descs](const int device_id,
const FragmentsList& frag_list,
const int64_t rowid_lookup_key,
const ExecutorDeviceType device_type) {
if (!frag_list.size()) {
return;
}
CHECK_GE(device_id, 0);

execution_kernels.emplace_back(std::make_unique<ExecutionKernel>(
ra_exe_unit,
device_type,
device_id,
co,
eo,
column_fetcher,
*query_comp_descs.at(device_type).get(),
*query_mem_descs.at(device_type).get(),
frag_list,
device_type == ExecutorDeviceType::CPU
? ExecutorDispatchMode::KernelPerFragment
: ExecutorDispatchMode::MultifragmentKernel,
rowid_lookup_key));

++frag_list_idx;
};
fragment_descriptor.assignFragsToMultiHeterogeneousDispatch(
multifrag_heterogeneous_kernel_dispatch, ra_exe_unit);
} else {
if (allow_single_frag_table_opt && query_mem_descs.count(ExecutorDeviceType::GPU) &&
(query_mem_descs.at(ExecutorDeviceType::GPU)->getQueryDescriptionType() ==
QueryDescriptionType::Projection) &&
table_infos.size() == 1) {
const auto max_frag_size =
table_infos.front().info.getFragmentNumTuplesUpperBound();
if (max_frag_size < query_mem_descs.at(ExecutorDeviceType::GPU)->getEntryCount()) {
LOG(INFO) << "Lowering scan limit from "
<< query_mem_descs.at(ExecutorDeviceType::GPU)->getEntryCount()
<< " to match max fragment size " << max_frag_size
<< " for kernel per fragment execution path.";
throw CompilationRetryNewScanLimit(max_frag_size);
}
}
}

size_t frag_list_idx{0};
auto fragment_per_kernel_dispatch = [&ra_exe_unit,
&execution_kernels,
&column_fetcher,
&co,
&eo,
&frag_list_idx,
&query_comp_descs,
&query_mem_descs](
const int device_id,
const FragmentsList& frag_list,
const int64_t rowid_lookup_key,
const ExecutorDeviceType device_type) {
if (!frag_list.size()) {
return;
}
CHECK_GE(device_id, 0);
CHECK(query_comp_descs.count(device_type));
CHECK(query_mem_descs.count(device_type));

execution_kernels.emplace_back(
std::make_unique<ExecutionKernel>(ra_exe_unit,
device_type,
device_id,
co,
eo,
column_fetcher,
*query_comp_descs.at(device_type).get(),
*query_mem_descs.at(device_type).get(),
frag_list,
ExecutorDispatchMode::KernelPerFragment,
rowid_lookup_key));
++frag_list_idx;
};
size_t frag_list_idx{0};
auto fragment_per_kernel_dispatch = [&ra_exe_unit,
&execution_kernels,
&column_fetcher,
&co,
&eo,
&frag_list_idx,
&query_comp_descs,
&query_mem_descs](
const int device_id,
const FragmentsList& frag_list,
const int64_t rowid_lookup_key,
const ExecutorDeviceType device_type) {
if (!frag_list.size()) {
return;
}
CHECK_GE(device_id, 0);
CHECK(query_comp_descs.count(device_type));
CHECK(query_mem_descs.count(device_type));

fragment_descriptor.assignFragsToKernelDispatch(fragment_per_kernel_dispatch,
ra_exe_unit);
execution_kernels.emplace_back(
std::make_unique<ExecutionKernel>(ra_exe_unit,
device_type,
device_id,
co,
eo,
column_fetcher,
*query_comp_descs.at(device_type).get(),
*query_mem_descs.at(device_type).get(),
frag_list,
ExecutorDispatchMode::KernelPerFragment,
rowid_lookup_key));
++frag_list_idx;
};

fragment_descriptor.assignFragsToKernelDispatch(fragment_per_kernel_dispatch,
ra_exe_unit);
}
return execution_kernels;
}

Expand Down