Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.

Commit f271f48

Browse files
committed
Move reduction code from ResultSetStorage.
Signed-off-by: ienkovich <[email protected]>
1 parent 590aea6 commit f271f48

8 files changed

+402
-321
lines changed

omniscidb/QueryEngine/Execute.cpp

+22-12
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
#include "QueryEngine/OutputBufferInitialization.h"
6060
#include "QueryEngine/QueryRewrite.h"
6161
#include "QueryEngine/QueryTemplateGenerator.h"
62+
#include "QueryEngine/ResultSetReduction.h"
6263
#include "QueryEngine/ResultSetReductionJIT.h"
6364
#include "QueryEngine/RuntimeFunctions.h"
6465
#include "QueryEngine/SpeculativeTopN.h"
@@ -1250,12 +1251,18 @@ ResultSetPtr Executor::reduceMultiDeviceResultSets(
12501251
reduced_results->initializeStorage();
12511252
switch (query_mem_desc.getEffectiveKeyWidth()) {
12521253
case 4:
1253-
first->getStorage()->moveEntriesToBuffer<int32_t>(
1254-
result_storage->getUnderlyingBuffer(), query_mem_desc.getEntryCount());
1254+
ResultSetReduction::moveEntriesToBuffer<int32_t>(
1255+
first->getStorage()->getQueryMemDesc(),
1256+
first->getStorage()->getUnderlyingBuffer(),
1257+
result_storage->getUnderlyingBuffer(),
1258+
query_mem_desc.getEntryCount());
12551259
break;
12561260
case 8:
1257-
first->getStorage()->moveEntriesToBuffer<int64_t>(
1258-
result_storage->getUnderlyingBuffer(), query_mem_desc.getEntryCount());
1261+
ResultSetReduction::moveEntriesToBuffer<int64_t>(
1262+
first->getStorage()->getQueryMemDesc(),
1263+
first->getStorage()->getUnderlyingBuffer(),
1264+
result_storage->getUnderlyingBuffer(),
1265+
query_mem_desc.getEntryCount());
12591266
break;
12601267
default:
12611268
CHECK(false);
@@ -1278,10 +1285,12 @@ ResultSetPtr Executor::reduceMultiDeviceResultSets(
12781285
(ResultSetStorage*)nullptr,
12791286
[&](auto r, ResultSetStorage* res) {
12801287
for (auto i = r.begin() + 1; i != r.end(); ++i) {
1281-
(*r.begin())->reduce(**i, {}, reduction_code, getConfig(), this);
1288+
ResultSetReduction::reduce(
1289+
**r.begin(), **i, {}, reduction_code, getConfig(), this);
12821290
}
12831291
if (res) {
1284-
res->reduce(*(*r.begin()), {}, reduction_code, getConfig(), this);
1292+
ResultSetReduction::reduce(
1293+
*res, *(*r.begin()), {}, reduction_code, getConfig(), this);
12851294
return res;
12861295
}
12871296
return *r.begin();
@@ -1293,16 +1302,17 @@ ResultSetPtr Executor::reduceMultiDeviceResultSets(
12931302
if (!rhs) {
12941303
return lhs;
12951304
}
1296-
lhs->reduce(*rhs, {}, reduction_code, getConfig(), this);
1305+
ResultSetReduction::reduce(*lhs, *rhs, {}, reduction_code, getConfig(), this);
12971306
return lhs;
12981307
});
12991308
} else {
13001309
for (size_t i = 1; i < results_per_device.size(); ++i) {
1301-
reduced_results->getStorage()->reduce(*(results_per_device[i].first->getStorage()),
1302-
{},
1303-
reduction_code,
1304-
getConfig(),
1305-
this);
1310+
ResultSetReduction::reduce(*reduced_results->getStorage(),
1311+
*(results_per_device[i].first->getStorage()),
1312+
{},
1313+
reduction_code,
1314+
getConfig(),
1315+
this);
13061316
}
13071317
}
13081318
reduced_results->addCompilationQueueTime(compilation_queue_time);

omniscidb/QueryEngine/QueryExecutionContext.cpp

+9-8
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include "QueryMemoryInitializer.h"
2626
#include "RelAlgExecutionUnit.h"
2727
#include "ResultSet.h"
28+
#include "ResultSetReduction.h"
2829
#include "Shared/likely.h"
2930
#include "SpeculativeTopN.h"
3031
#include "StreamingTopN.h"
@@ -143,14 +144,14 @@ ResultSetPtr QueryExecutionContext::groupBufferToDeinterleavedResults(
143144
memcpy(&agg_vals[0],
144145
&executor_->plan_state_->init_agg_vals_[0],
145146
agg_col_count * sizeof(agg_vals[0]));
146-
ResultSetStorage::reduceSingleRow(rows_ptr + bin_base_off,
147-
executor_->warpSize(),
148-
false,
149-
true,
150-
agg_vals,
151-
query_mem_desc_,
152-
result_set->getTargetInfos(),
153-
executor_->plan_state_->init_agg_vals_);
147+
ResultSetReduction::reduceSingleRow(rows_ptr + bin_base_off,
148+
executor_->warpSize(),
149+
false,
150+
true,
151+
agg_vals,
152+
query_mem_desc_,
153+
result_set->getTargetInfos(),
154+
executor_->plan_state_->init_agg_vals_);
154155
for (size_t agg_idx = 0; agg_idx < agg_col_count;
155156
++agg_idx, ++deinterleaved_buffer_idx) {
156157
deinterleaved_buffer[deinterleaved_buffer_idx] = agg_vals[agg_idx];

0 commit comments

Comments
 (0)