Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.

Commit b5e1067

Browse files
committed
unambiguous executor
1 parent 2e963de commit b5e1067

14 files changed

+308
-234
lines changed

omniscidb/QueryEngine/CostModel/CostModel.h

+3-1
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,9 @@ class CostModel {
4949

5050
virtual void calibrate(const CaibrationConfig& conf);
5151
virtual std::unique_ptr<policy::ExecutionPolicy> predict(
52-
QueryInfo query_info) const = 0;
52+
QueryInfo query_info,
53+
const std::map<ExecutorDeviceType, ExecutorDispatchMode>& devices_dispatch_modes)
54+
const = 0;
5355

5456
protected:
5557
struct DeviceExtrapolations {

omniscidb/QueryEngine/CostModel/Dispatchers/DefaultExecutionPolicy.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ SchedulingAssignment FragmentIDAssignmentExecutionPolicy::scheduleSingleFragment
2424
int device_id = fragment.deviceIds[static_cast<int>(memory_level)];
2525
return {dt_, device_id};
2626
}
27-
std::vector<ExecutorDeviceType> FragmentIDAssignmentExecutionPolicy::devices() const {
27+
std::set<ExecutorDeviceType> FragmentIDAssignmentExecutionPolicy::devices() const {
2828
return {dt_};
2929
}
3030
} // namespace policy

omniscidb/QueryEngine/CostModel/Dispatchers/DefaultExecutionPolicy.h

+5-2
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,14 @@
1818
namespace policy {
1919
class FragmentIDAssignmentExecutionPolicy : public ExecutionPolicy {
2020
public:
21-
FragmentIDAssignmentExecutionPolicy(ExecutorDeviceType dt) : dt_(dt){};
21+
FragmentIDAssignmentExecutionPolicy(
22+
ExecutorDeviceType dt,
23+
const std::map<ExecutorDeviceType, ExecutorDispatchMode>& devices_dispatch_modes)
24+
: ExecutionPolicy(devices_dispatch_modes), dt_(dt){};
2225
SchedulingAssignment scheduleSingleFragment(const FragmentInfo&,
2326
size_t frag_id,
2427
size_t frag_num) const override;
25-
std::vector<ExecutorDeviceType> devices() const override;
28+
std::set<ExecutorDeviceType> devices() const override;
2629
std::string name() const override { return "ExecutionPolicy::FragmentIDAssignment"; };
2730

2831
private:

omniscidb/QueryEngine/CostModel/Dispatchers/ExecutionPolicy.h

+30-9
Original file line numberDiff line numberDiff line change
@@ -27,28 +27,49 @@ struct SchedulingAssignment {
2727
};
2828

2929
class ExecutionPolicy {
30+
// Probe/modify modes during kernel building (do not iterate). These are the default
31+
// modes.
32+
const std::map<ExecutorDeviceType, ExecutorDispatchMode> devices_dispatch_modes_{
33+
{ExecutorDeviceType::CPU, ExecutorDispatchMode::KernelPerFragment},
34+
{ExecutorDeviceType::GPU, ExecutorDispatchMode::KernelPerFragment}};
35+
3036
public:
37+
ExecutionPolicy(
38+
const std::map<ExecutorDeviceType, ExecutorDispatchMode>& devices_dispatch_modes)
39+
: devices_dispatch_modes_(devices_dispatch_modes){};
3140
virtual SchedulingAssignment scheduleSingleFragment(const FragmentInfo&,
3241
size_t frag_id,
3342
size_t frag_num) const = 0;
34-
virtual std::vector<ExecutorDeviceType> devices() const {
35-
return {ExecutorDeviceType::CPU, ExecutorDeviceType::GPU};
43+
44+
virtual std::set<ExecutorDeviceType> devices() const {
45+
std::set<ExecutorDeviceType> res;
46+
for (const auto& dt_mode : devices_dispatch_modes_) {
47+
res.insert(dt_mode.first);
48+
}
49+
return res;
50+
}
51+
52+
virtual bool hasDevice(const ExecutorDeviceType dt) const {
53+
return (devices_dispatch_modes_.count(dt) != 0);
54+
}
55+
56+
virtual ExecutorDispatchMode getExecutionMode(const ExecutorDeviceType dt) const {
57+
CHECK(hasDevice(dt));
58+
return devices_dispatch_modes_.at(dt);
59+
}
60+
61+
virtual std::map<ExecutorDeviceType, ExecutorDispatchMode> getExecutionModes() const {
62+
return devices_dispatch_modes_;
3663
}
3764
virtual std::string name() const = 0;
3865

3966
virtual ~ExecutionPolicy() = default;
40-
41-
// Probe/modify modes during kernel building (do not iterate). These are the default
42-
// modes.
43-
std::unordered_map<ExecutorDeviceType, ExecutorDispatchMode> devices_dispatch_modes{
44-
{ExecutorDeviceType::CPU, ExecutorDispatchMode::KernelPerFragment},
45-
{ExecutorDeviceType::GPU, ExecutorDispatchMode::KernelPerFragment}};
4667
};
4768

4869
inline std::ostream& operator<<(std::ostream& os, const ExecutionPolicy& policy) {
4970
os << policy.name() << "\n";
5071
os << "Dispatching modes: \n";
51-
for (const auto& device_disp_mode : policy.devices_dispatch_modes) {
72+
for (const auto& device_disp_mode : policy.getExecutionModes()) {
5273
os << device_disp_mode.first << " - " << device_disp_mode.second << "\n";
5374
}
5475
return os;

omniscidb/QueryEngine/CostModel/Dispatchers/ProportionBasedExecutionPolicy.cpp

+3-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919
namespace policy {
2020

2121
ProportionBasedExecutionPolicy::ProportionBasedExecutionPolicy(
22-
std::map<ExecutorDeviceType, unsigned>&& propotion) {
22+
std::map<ExecutorDeviceType, unsigned>&& propotion,
23+
const std::map<ExecutorDeviceType, ExecutorDispatchMode>& devices_dispatch_modes)
24+
: ExecutionPolicy(devices_dispatch_modes) {
2325
CHECK_GT(propotion.size(), 0u);
2426
proportion_.merge(propotion);
2527
total_parts_ = std::accumulate(

omniscidb/QueryEngine/CostModel/Dispatchers/ProportionBasedExecutionPolicy.h

+3-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ namespace policy {
2424
*/
2525
class ProportionBasedExecutionPolicy : public ExecutionPolicy {
2626
public:
27-
ProportionBasedExecutionPolicy(std::map<ExecutorDeviceType, unsigned>&& proportion);
27+
ProportionBasedExecutionPolicy(
28+
std::map<ExecutorDeviceType, unsigned>&& proportion,
29+
const std::map<ExecutorDeviceType, ExecutorDispatchMode>& devices_dispatch_modes);
2830
SchedulingAssignment scheduleSingleFragment(const FragmentInfo&,
2931
size_t frag_id,
3032
size_t frag_num) const override;

omniscidb/QueryEngine/CostModel/Dispatchers/RRExecutionPolicy.h

+4
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@
1818
namespace policy {
1919
class RoundRobinExecutionPolicy : public ExecutionPolicy {
2020
public:
21+
RoundRobinExecutionPolicy(
22+
const std::map<ExecutorDeviceType, ExecutorDispatchMode>& devices_dispatch_modes)
23+
: ExecutionPolicy(devices_dispatch_modes){};
24+
2125
SchedulingAssignment scheduleSingleFragment(const FragmentInfo&,
2226
size_t frag_id,
2327
size_t frag_num) const override;

omniscidb/QueryEngine/CostModel/IterativeCostModel.cpp

+5-2
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@ IterativeCostModel::IterativeCostModel()
3131
#endif
3232

3333
std::unique_ptr<policy::ExecutionPolicy> IterativeCostModel::predict(
34-
QueryInfo query_info) const {
34+
QueryInfo query_info,
35+
const std::map<ExecutorDeviceType, ExecutorDispatchMode>& devices_dispatch_modes)
36+
const {
3537
std::shared_lock<std::shared_mutex> l(latch_);
3638

3739
unsigned cpu_prop = 1, gpu_prop = 0;
@@ -74,6 +76,7 @@ std::unique_ptr<policy::ExecutionPolicy> IterativeCostModel::predict(
7476
proportion[ExecutorDeviceType::GPU] = gpu_prop;
7577
proportion[ExecutorDeviceType::CPU] = cpu_prop;
7678

77-
return std::make_unique<policy::ProportionBasedExecutionPolicy>(std::move(proportion));
79+
return std::make_unique<policy::ProportionBasedExecutionPolicy>(std::move(proportion),
80+
devices_dispatch_modes);
7881
}
7982
} // namespace costmodel

omniscidb/QueryEngine/CostModel/IterativeCostModel.h

+4-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,10 @@ class IterativeCostModel : public CostModel {
2424
IterativeCostModel();
2525
IterativeCostModel(CostModelConfig config) : CostModel(std::move(config)) {}
2626

27-
virtual std::unique_ptr<policy::ExecutionPolicy> predict(QueryInfo query_info) const;
27+
virtual std::unique_ptr<policy::ExecutionPolicy> predict(
28+
QueryInfo query_info,
29+
const std::map<ExecutorDeviceType, ExecutorDispatchMode>& devices_dispatch_modes)
30+
const;
2831

2932
private:
3033
static constexpr size_t optimization_iterations_ = 1024;

omniscidb/QueryEngine/Descriptors/QueryFragmentDescriptor.cpp

+33-28
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ void QueryFragmentDescriptor::buildFragmentPerKernelForTable(
9999
Executor* executor,
100100
compiler::CodegenTraitsDescriptor cgen_traits_desc) {
101101
const auto inner_table_id_to_join_condition = executor->getInnerTabIdToJoinCond();
102-
LOG(INFO) << *policy;
102+
LOG(INFO) << "Building Kernel Fragment table with policy: " << *policy;
103103
for (size_t frag_id = 0; frag_id < fragments->size(); frag_id++) {
104104
if (!allowed_outer_fragment_indices_.empty()) {
105105
if (std::find(allowed_outer_fragment_indices_.begin(),
@@ -162,7 +162,7 @@ void QueryFragmentDescriptor::buildFragmentPerKernelForTable(
162162
const auto table_id = ra_exe_unit.input_descs[table_desc_idx].getTableId();
163163
auto table_frags_it = selected_tables_fragments_.find({db_id, table_id});
164164
CHECK(table_frags_it != selected_tables_fragments_.end());
165-
if (policy->devices_dispatch_modes.at(device_type) ==
165+
if (policy->getExecutionMode(device_type) ==
166166
ExecutorDispatchMode::KernelPerFragment) {
167167
execution_kernel_desc.fragments.emplace_back(
168168
FragmentsPerTable{db_id, table_id, frag_ids});
@@ -198,7 +198,7 @@ void QueryFragmentDescriptor::buildFragmentPerKernelForTable(
198198
}
199199
LOG(DEBUG1) << "Assigning frag_id=" << frag_id << "/" << fragments->size() - 1
200200
<< " to " << device_type << ", device_id=" << device_id;
201-
if (policy->devices_dispatch_modes.at(device_type) ==
201+
if (policy->getExecutionMode(device_type) ==
202202
ExecutorDispatchMode::KernelPerFragment) {
203203
auto itr = execution_kernels_per_device_[device_type].find(device_id);
204204
if (itr == execution_kernels_per_device_[device_type].end()) {
@@ -237,31 +237,36 @@ void QueryFragmentDescriptor::buildFragmentPerKernelMapForUnion(
237237
j,
238238
executor,
239239
cgen_traits_desc);
240-
241-
std::vector<int> table_cpu_ids =
242-
std::accumulate(execution_kernels_per_device_[ExecutorDeviceType::CPU][0].begin(),
243-
execution_kernels_per_device_[ExecutorDeviceType::CPU][0].end(),
244-
std::vector<int>(),
245-
[](auto&& vec, auto& exe_kern) {
246-
vec.push_back(exe_kern.fragments[0].table_id);
247-
return vec;
248-
});
249-
std::vector<int> table_gpu_ids =
250-
std::accumulate(execution_kernels_per_device_[ExecutorDeviceType::GPU][0].begin(),
251-
execution_kernels_per_device_[ExecutorDeviceType::GPU][0].end(),
252-
std::vector<int>(),
253-
[](auto&& vec, auto& exe_kern) {
254-
vec.push_back(exe_kern.fragments[0].table_id);
255-
return vec;
256-
});
257-
VLOG(1) << "execution_kernels_per_device_[CPU].size()="
258-
<< execution_kernels_per_device_[ExecutorDeviceType::CPU].size()
259-
<< " execution_kernels_per_device_[CPU][0][*].fragments[0].table_id="
260-
<< shared::printContainer(table_cpu_ids);
261-
VLOG(1) << "execution_kernels_per_device_[GPU].size()="
262-
<< execution_kernels_per_device_[ExecutorDeviceType::GPU].size()
263-
<< " execution_kernels_per_device_[GPU][0][*].fragments[0].table_id="
264-
<< shared::printContainer(table_gpu_ids);
240+
if (policy->hasDevice(ExecutorDeviceType::CPU)) {
241+
CHECK(execution_kernels_per_device_.count(ExecutorDeviceType::CPU));
242+
std::vector<int> table_cpu_ids = std::accumulate(
243+
execution_kernels_per_device_.at(ExecutorDeviceType::CPU)[0].begin(),
244+
execution_kernels_per_device_.at(ExecutorDeviceType::CPU)[0].end(),
245+
std::vector<int>(),
246+
[](auto&& vec, auto& exe_kern) {
247+
vec.push_back(exe_kern.fragments[0].table_id);
248+
return vec;
249+
});
250+
VLOG(1) << "execution_kernels_per_device_[CPU].size()="
251+
<< execution_kernels_per_device_.at(ExecutorDeviceType::CPU).size()
252+
<< " execution_kernels_per_device_[CPU][0][*].fragments[0].table_id="
253+
<< shared::printContainer(table_cpu_ids);
254+
}
255+
if (policy->hasDevice(ExecutorDeviceType::GPU)) {
256+
CHECK(execution_kernels_per_device_.count(ExecutorDeviceType::GPU));
257+
std::vector<int> table_gpu_ids = std::accumulate(
258+
execution_kernels_per_device_.at(ExecutorDeviceType::GPU)[0].begin(),
259+
execution_kernels_per_device_.at(ExecutorDeviceType::GPU)[0].end(),
260+
std::vector<int>(),
261+
[](auto&& vec, auto& exe_kern) {
262+
vec.push_back(exe_kern.fragments[0].table_id);
263+
return vec;
264+
});
265+
VLOG(1) << "execution_kernels_per_device_[GPU].size()="
266+
<< execution_kernels_per_device_.at(ExecutorDeviceType::GPU).size()
267+
<< " execution_kernels_per_device_[GPU][0][*].fragments[0].table_id="
268+
<< shared::printContainer(table_gpu_ids);
269+
}
265270
}
266271
}
267272

omniscidb/QueryEngine/Descriptors/QueryFragmentDescriptor.h

+16-17
Original file line numberDiff line numberDiff line change
@@ -87,12 +87,12 @@ class QueryFragmentDescriptor {
8787
template <typename DISPATCH_FCN>
8888
void dispatchKernelsToDevices(DISPATCH_FCN dispatcher_f,
8989
const RelAlgExecutionUnit& ra_exe_unit,
90-
policy::ExecutionPolicy* policy) const {
90+
const policy::ExecutionPolicy* policy) const {
9191
std::unordered_map<ExecutorDeviceType, std::unordered_map<int, size_t>>
9292
execution_kernel_index;
9393
size_t tuple_count = 0;
9494
for (const auto& device_type_itr : execution_kernels_per_device_) {
95-
if (policy->devices_dispatch_modes.at(device_type_itr.first) ==
95+
if (policy->getExecutionMode(device_type_itr.first) ==
9696
ExecutorDispatchMode::KernelPerFragment) {
9797
for (const auto& device_itr : device_type_itr.second) {
9898
CHECK(execution_kernel_index[device_type_itr.first]
@@ -103,7 +103,7 @@ class QueryFragmentDescriptor {
103103
}
104104

105105
for (const auto& device_type_itr : execution_kernels_per_device_) {
106-
if (policy->devices_dispatch_modes.at(device_type_itr.first) ==
106+
if (policy->getExecutionMode(device_type_itr.first) ==
107107
ExecutorDispatchMode::MultifragmentKernel) {
108108
for (const auto& device_itr : device_type_itr.second) {
109109
const auto& execution_kernels = device_itr.second;
@@ -115,22 +115,21 @@ class QueryFragmentDescriptor {
115115
bool dispatch_finished = false;
116116
while (!dispatch_finished) {
117117
dispatch_finished = true;
118-
for (const auto& device_type_itr : execution_kernels_per_device_)
119-
for (const auto& device_itr : device_type_itr.second) {
120-
auto& kernel_idx =
121-
execution_kernel_index[device_type_itr.first][device_itr.first];
122-
if (kernel_idx < device_itr.second.size()) {
123-
dispatch_finished = false;
124-
const auto& execution_kernel = device_itr.second[kernel_idx++];
125-
dispatcher_f(device_itr.first,
126-
execution_kernel.fragments,
127-
rowid_lookup_key_,
128-
device_type_itr.first);
129-
if (terminateDispatchMaybe(tuple_count, ra_exe_unit, execution_kernel)) {
130-
return;
131-
}
118+
for (const auto& device_itr : device_type_itr.second) {
119+
auto& kernel_idx =
120+
execution_kernel_index[device_type_itr.first][device_itr.first];
121+
if (kernel_idx < device_itr.second.size()) {
122+
dispatch_finished = false;
123+
const auto& execution_kernel = device_itr.second[kernel_idx++];
124+
dispatcher_f(device_itr.first,
125+
execution_kernel.fragments,
126+
rowid_lookup_key_,
127+
device_type_itr.first);
128+
if (terminateDispatchMaybe(tuple_count, ra_exe_unit, execution_kernel)) {
129+
return;
132130
}
133131
}
132+
}
134133
}
135134
}
136135
}

0 commit comments

Comments
 (0)