From e1d6f6053f6ea32c39c9c59148b487f63b5bc183 Mon Sep 17 00:00:00 2001 From: ienkovich Date: Wed, 14 Jun 2023 13:23:01 -0500 Subject: [PATCH] Support partitioned aggregation. Signed-off-by: ienkovich --- omniscidb/ConfigBuilder/ConfigBuilder.cpp | 39 ++ omniscidb/IR/ExprCollector.h | 11 + omniscidb/IR/ExprVisitor.h | 1 + omniscidb/IR/Node.cpp | 101 ++++- omniscidb/IR/Node.h | 82 +++- .../QueryEngine/CardinalityEstimator.cpp | 3 + omniscidb/QueryEngine/CardinalityEstimator.h | 16 + omniscidb/QueryEngine/CgenState.cpp | 10 +- omniscidb/QueryEngine/CgenState.h | 3 + omniscidb/QueryEngine/CompilationOptions.h | 6 + omniscidb/QueryEngine/Execute.cpp | 200 ++++++++- omniscidb/QueryEngine/Execute.h | 16 +- omniscidb/QueryEngine/ExecutionKernel.cpp | 4 +- omniscidb/QueryEngine/ExecutionKernel.h | 2 +- omniscidb/QueryEngine/ExprDagVisitor.h | 11 + omniscidb/QueryEngine/MemoryLayoutBuilder.cpp | 24 +- .../QueryEngine/QueryExecutionContext.cpp | 58 +-- .../QueryPhysicalInputsCollector.cpp | 10 + omniscidb/QueryEngine/QueryRewrite.cpp | 5 +- omniscidb/QueryEngine/RelAlgExecutionUnit.h | 12 + omniscidb/QueryEngine/RelAlgExecutor.cpp | 397 +++++++++++++++--- omniscidb/QueryEngine/RelAlgExecutor.h | 5 + omniscidb/QueryEngine/RelAlgVisitor.h | 6 + omniscidb/QueryEngine/RowFuncBuilder.cpp | 177 +++++++- omniscidb/QueryEngine/RowFuncBuilder.h | 12 + omniscidb/QueryEngine/TargetExprBuilder.cpp | 31 ++ omniscidb/QueryEngine/TargetExprBuilder.h | 14 + .../Visitors/TemplateAggregationVisitor.h | 4 + omniscidb/QueryEngine/WorkUnitBuilder.cpp | 97 ++++- omniscidb/QueryEngine/WorkUnitBuilder.h | 5 + omniscidb/Shared/Config.h | 7 + omniscidb/Shared/MathUtils.cpp | 18 +- omniscidb/Shared/MathUtils.h | 27 +- .../Tests/ArrowSQLRunner/ArrowSQLRunner.cpp | 22 + .../Tests/ArrowSQLRunner/ArrowSQLRunner.h | 4 + omniscidb/Tests/CMakeLists.txt | 4 + omniscidb/Tests/PartitionedGroupByTest.cpp | 177 ++++++++ 37 files changed, 1479 insertions(+), 142 deletions(-) create mode 100644 omniscidb/Tests/PartitionedGroupByTest.cpp diff --git a/omniscidb/ConfigBuilder/ConfigBuilder.cpp b/omniscidb/ConfigBuilder/ConfigBuilder.cpp index 57e08f186a..d73299ce88 100644 --- a/omniscidb/ConfigBuilder/ConfigBuilder.cpp +++ b/omniscidb/ConfigBuilder/ConfigBuilder.cpp @@ -183,6 +183,39 @@ bool ConfigBuilder::parseCommandLineArgs(int argc, po::value(&config_->exec.group_by.large_ndv_multiplier) ->default_value(config_->exec.group_by.large_ndv_multiplier), "A multiplier applied to NDV estimator buffer size for large ranges."); + opt_desc.add_options()( + "enable-cpu-partitioned-groupby", + po::value(&config_->exec.group_by.enable_cpu_partitioned_groupby) + ->default_value(config_->exec.group_by.enable_cpu_partitioned_groupby) + ->implicit_value(true), + "Enable partitioned aggregation on CPU."); + opt_desc.add_options()( + "groupby-partitioning-buffer-size-threshold", + po::value(&config_->exec.group_by.partitioning_buffer_size_threshold) + ->default_value(config_->exec.group_by.partitioning_buffer_size_threshold), + "Minimal estimated output buffer size to enable partitioned aggregation."); + opt_desc.add_options()( + "groupby-partitioning-group-size-threshold", + po::value(&config_->exec.group_by.partitioning_group_size_threshold) + ->default_value(config_->exec.group_by.partitioning_group_size_threshold), + "Maximum average estimated number of rows per output group to enable partitioned " + "aggregation."); + opt_desc.add_options()("groupby-min-partitions", + po::value(&config_->exec.group_by.min_partitions) + ->default_value(config_->exec.group_by.min_partitions), + "A minimal number of partitions to be used for partitioned " + "aggregation. 0 value is used for auto-detection."); + opt_desc.add_options()("groupby-max-partitions", + po::value(&config_->exec.group_by.max_partitions) + ->default_value(config_->exec.group_by.max_partitions), + "A maximum number of partitions to be used for partitioned " + "aggregation."); + opt_desc.add_options()( + "groupby-partitioning-target-buffer-size", + po::value(&config_->exec.group_by.partitioning_buffer_target_size) + ->default_value(config_->exec.group_by.partitioning_buffer_target_size), + "A preferred aggregation output buffer size used to compute number of partitions " + "to use."); // exec.window opt_desc.add_options()("enable-window-functions", @@ -349,6 +382,12 @@ bool ConfigBuilder::parseCommandLineArgs(int argc, ->implicit_value(true), "Enable multi-fragment intermediate results to improve execution parallelism for " "queries with multiple execution steps"); + opt_desc.add_options()( + "enable-multifrag-execution-result", + po::value(&config_->exec.enable_multifrag_execution_result) + ->default_value(config_->exec.enable_multifrag_execution_result) + ->implicit_value(true), + "Enable multi-fragment final execution result"); opt_desc.add_options()("gpu-block-size", po::value(&config_->exec.override_gpu_block_size) ->default_value(config_->exec.override_gpu_block_size), diff --git a/omniscidb/IR/ExprCollector.h b/omniscidb/IR/ExprCollector.h index 41c4528d1d..f1770c9dff 100644 --- a/omniscidb/IR/ExprCollector.h +++ b/omniscidb/IR/ExprCollector.h @@ -25,10 +25,21 @@ class ExprCollector : public ExprVisitor { return collect(expr.get(), std::forward(args)...); } + template + static ResultType collect(const ExprPtrVector& exprs, Ts&&... args) { + CollectorType collector(std::forward(args)...); + for (auto& expr : exprs) { + collector.visit(expr.get()); + } + return std::move(collector.result_); + } + ResultType& result() { return result_; } const ResultType& result() const { return result_; } protected: + using BaseClass = ExprCollector; + ResultType result_; }; diff --git a/omniscidb/IR/ExprVisitor.h b/omniscidb/IR/ExprVisitor.h index 0faa7c4ca3..28aabc5100 100644 --- a/omniscidb/IR/ExprVisitor.h +++ b/omniscidb/IR/ExprVisitor.h @@ -115,6 +115,7 @@ class ExprVisitor { if (auto agg = expr->as()) { return visitAggExpr(agg); } + CHECK(false) << "Unhandled expr: " << expr->toString(); return defaultResult(expr); } diff --git a/omniscidb/IR/Node.cpp b/omniscidb/IR/Node.cpp index eec7de74f1..6c715ebb4e 100644 --- a/omniscidb/IR/Node.cpp +++ b/omniscidb/IR/Node.cpp @@ -284,6 +284,90 @@ void LogicalUnion::checkForMatchingMetaInfoTypes() const { } } +size_t ShuffleFunction::hash() const { + size_t res = 0; + boost::hash_combine(res, kind); + boost::hash_combine(res, partitions); + return res; +} + +std::string ShuffleFunction::toString() const { + std::stringstream ss; + ss << kind << "(" << partitions << ")"; + return ss.str(); +} + +std::ostream& operator<<(std::ostream& os, const ShuffleFunction& fn) { + os << fn.toString(); + return os; +} + +std::ostream& operator<<(std::ostream& os, ShuffleFunction::Kind kind) { + os << ::toString(kind); + return os; +} + +Shuffle::Shuffle(ExprPtrVector keys, + ExprPtr expr, + std::string field, + ShuffleFunction fn, + NodePtr input) + : keys_(keys), exprs_({std::move(expr)}), fields_({std::move(field)}), fn_(fn) { + inputs_.emplace_back(std::move(input)); +} + +Shuffle::Shuffle(ExprPtrVector keys, + ExprPtrVector exprs, + std::vector fields, + ShuffleFunction fn, + std::vector inputs) + : keys_(keys), exprs_(std::move(exprs)), fields_(std::move(fields)), fn_(fn) { + inputs_ = std::move(inputs); +} + +std::string Shuffle::toString() const { + return cat(::typeName(this), + getIdString(), + "(keys=", + ::toString(keys_), + ", exprs=", + ::toString(exprs_), + ", fields", + ::toString(fields_), + ", fn=", + ::toString(fn_), + ")"); +} + +size_t Shuffle::toHash() const { + if (!hash_) { + hash_ = typeid(Shuffle).hash_code(); + for (auto& expr : keys_) { + boost::hash_combine(*hash_, expr->hash()); + } + for (auto& expr : exprs_) { + boost::hash_combine(*hash_, expr->hash()); + } + for (auto& field : fields_) { + boost::hash_combine(*hash_, field); + } + boost::hash_combine(*hash_, fn_.hash()); + for (auto& node : inputs_) { + boost::hash_combine(*hash_, node->toHash()); + } + } + return *hash_; +} + +void Shuffle::rewriteExprs(hdk::ir::ExprRewriter& rewriter) { + for (size_t i = 0; i < keys_.size(); ++i) { + keys_[i] = rewriter.visit(keys_[i].get()); + } + for (size_t i = 0; i < exprs_.size(); ++i) { + exprs_[i] = rewriter.visit(exprs_[i].get()); + } +} + namespace { void collectNodes(NodePtr node, std::vector nodes) { @@ -318,7 +402,7 @@ void QueryDag::resetQueryExecutionState() { // TODO: always simply use node->size() size_t getNodeColumnCount(const Node* node) { // Nodes that don't depend on input. - if (is_one_of(node)) { + if (is_one_of(node)) { return node->size(); } @@ -356,7 +440,8 @@ ExprPtrVector getNodeColumnRefs(const Node* node) { LogicalValues, Filter, Sort, - Join>(node)) { + Join, + Shuffle>(node)) { return genColumnRefs(node, getNodeColumnCount(node)); } @@ -374,7 +459,8 @@ ExprPtr getNodeColumnRef(const Node* node, unsigned index) { LogicalValues, Filter, Sort, - Join>(node)) { + Join, + Shuffle>(node)) { return makeExpr(getColumnType(node, index), node, index); } @@ -458,3 +544,12 @@ const Type* getColumnType(const Node* node, size_t col_idx) { } } // namespace hdk::ir + +std::string toString(hdk::ir::ShuffleFunction::Kind kind) { + switch (kind) { + case hdk::ir::ShuffleFunction::kHash: + return "Hash"; + } + LOG(FATAL) << "Invalid shuffle kind."; + return ""; +} diff --git a/omniscidb/IR/Node.h b/omniscidb/IR/Node.h index f266eda06e..06efcf21f0 100644 --- a/omniscidb/IR/Node.h +++ b/omniscidb/IR/Node.h @@ -377,7 +377,9 @@ class Aggregate : public Node { NodePtr input) : groupby_count_(groupby_count) , aggs_(std::move(aggs)) - , fields_(std::move(fields)) { + , fields_(std::move(fields)) + , partitioned_(false) + , buffer_entry_count_hint_(0) { inputs_.emplace_back(std::move(input)); } @@ -406,6 +408,12 @@ class Aggregate : public Node { void setAggExprs(ExprPtrVector new_aggs) { aggs_ = std::move(new_aggs); } + bool isPartitioned() const { return partitioned_; } + void setPartitioned(bool val) { partitioned_ = val; } + + size_t bufferEntryCountHint() const { return buffer_entry_count_hint_; } + void setBufferEntryCountHint(size_t val) { buffer_entry_count_hint_ = val; } + void rewriteExprs(hdk::ir::ExprRewriter& rewriter) override; std::string toString() const override { @@ -417,6 +425,7 @@ class Aggregate : public Node { ::toString(aggs_), ", fields=", ::toString(fields_), + (partitioned_ ? ", partitioned" : ""), ", inputs=", inputsToString(inputs_), ")"); @@ -445,6 +454,8 @@ class Aggregate : public Node { const size_t groupby_count_; ExprPtrVector aggs_; std::vector fields_; + bool partitioned_; + size_t buffer_entry_count_hint_; }; class Join : public Node { @@ -593,7 +604,9 @@ class TranslatedJoin : public Node { CHECK(false); return nullptr; } - const std::string& getFieldName(size_t i) const override { CHECK(false); } + const std::string& getFieldName(size_t i) const override { + throw std::runtime_error("Unexpected call to TranslatedJoin::getFieldName."); + } std::vector getJoinCols(bool lhs) const { if (lhs) { return lhs_join_cols_; @@ -853,6 +866,69 @@ class LogicalUnion : public Node { bool const is_all_; }; +struct ShuffleFunction { + enum Kind { + kHash, + }; + + Kind kind; + size_t partitions; + + size_t hash() const; + std::string toString() const; +}; + +std::ostream& operator<<(std::ostream& os, const ShuffleFunction& fn); +std::ostream& operator<<(std::ostream& os, ShuffleFunction::Kind kind); + +class Shuffle : public Node { + public: + Shuffle(ExprPtrVector keys, + ExprPtr expr, + std::string field, + ShuffleFunction fn, + NodePtr input); + Shuffle(ExprPtrVector keys, + ExprPtrVector exprs, + std::vector fields, + ShuffleFunction fn, + std::vector input); + Shuffle(const Shuffle& other) = default; + + const ExprPtrVector& keys() const { return keys_; } + const ExprPtrVector& exprs() const { return exprs_; } + const std::vector& fields() const { return fields_; } + ShuffleFunction fn() const { return fn_; } + + size_t size() const override { return exprs_.size(); } + + // Shuffle node can be used for computing partition sizes and perform + // actual partitioning. The first version uses COUNT aggregte as its + // only target expression. + bool isCount() const { + return exprs_.size() == (size_t)1 && exprs_.front()->is(); + } + + std::string toString() const override; + size_t toHash() const override; + void rewriteExprs(hdk::ir::ExprRewriter& rewriter) override; + + std::shared_ptr deepCopy() const override { + return std::make_shared(*this); + } + + const std::string& getFieldName(size_t i) const override { + CHECK_LT(i, fields_.size()); + return fields_[i]; + } + + private: + ExprPtrVector keys_; + ExprPtrVector exprs_; + std::vector fields_; + ShuffleFunction fn_; +}; + class QueryNotSupported : public std::runtime_error { public: QueryNotSupported(const std::string& reason) : std::runtime_error(reason) {} @@ -921,3 +997,5 @@ size_t getNodeColumnCount(const Node* node); ExprPtr getJoinInputColumnRef(const ColumnRef* col_ref); } // namespace hdk::ir + +std::string toString(hdk::ir::ShuffleFunction::Kind kind); diff --git a/omniscidb/QueryEngine/CardinalityEstimator.cpp b/omniscidb/QueryEngine/CardinalityEstimator.cpp index 811bfb0fcc..9ade402fc0 100644 --- a/omniscidb/QueryEngine/CardinalityEstimator.cpp +++ b/omniscidb/QueryEngine/CardinalityEstimator.cpp @@ -114,6 +114,9 @@ RelAlgExecutionUnit create_count_all_execution_unit( ra_exe_unit.hash_table_build_plan_dag, ra_exe_unit.table_id_to_node_map, ra_exe_unit.union_all, + ra_exe_unit.shuffle_fn, + ra_exe_unit.partition_offsets_col, + ra_exe_unit.partitioned_aggregation, ra_exe_unit.cost_model, {}}; // TODO(bagrorg): should we use costmodel here? } diff --git a/omniscidb/QueryEngine/CardinalityEstimator.h b/omniscidb/QueryEngine/CardinalityEstimator.h index bb9eea03fb..29354bcda6 100644 --- a/omniscidb/QueryEngine/CardinalityEstimator.h +++ b/omniscidb/QueryEngine/CardinalityEstimator.h @@ -43,6 +43,22 @@ class CardinalityEstimationRequired : public std::runtime_error { const int64_t range_; }; +class RequestPartitionedAggregation : public std::runtime_error { + public: + RequestPartitionedAggregation(size_t entry_size, size_t estimated_buffer_entries) + : std::runtime_error("RequestPartitionedAggregation") + , entry_size_(entry_size) + , estimated_buffer_entries_(estimated_buffer_entries) {} + + size_t entrySize() const { return entry_size_; } + size_t estimatedBufferEntries() const { return estimated_buffer_entries_; } + size_t estimatedBufferSize() const { return entry_size_ * estimated_buffer_entries_; } + + private: + size_t entry_size_; + size_t estimated_buffer_entries_; +}; + RelAlgExecutionUnit create_ndv_execution_unit(const RelAlgExecutionUnit& ra_exe_unit, SchemaProvider* schema_provider, const Config& config, diff --git a/omniscidb/QueryEngine/CgenState.cpp b/omniscidb/QueryEngine/CgenState.cpp index 0a2912cce2..3bc94ad7eb 100644 --- a/omniscidb/QueryEngine/CgenState.cpp +++ b/omniscidb/QueryEngine/CgenState.cpp @@ -209,7 +209,8 @@ bool is_l0_module(const llvm::Module* m) { return m->getTargetTriple().rfind("spir", 0) == 0; } -llvm::Value* CgenState::emitCall(const std::string& fname, +llvm::Value* CgenState::emitCall(llvm::IRBuilder<>& ir_builder, + const std::string& fname, const std::vector& args) { // Get the function reference from the query module. auto func = module_->getFunction(fname); @@ -218,7 +219,12 @@ llvm::Value* CgenState::emitCall(const std::string& fname, // module. maybeCloneFunctionRecursive(func, is_l0_module(module_)); - return ir_builder_.CreateCall(func, args); + return ir_builder.CreateCall(func, args); +} + +llvm::Value* CgenState::emitCall(const std::string& fname, + const std::vector& args) { + return emitCall(ir_builder_, fname, args); } void CgenState::emitErrorCheck(llvm::Value* condition, diff --git a/omniscidb/QueryEngine/CgenState.h b/omniscidb/QueryEngine/CgenState.h index 228b52091d..1ee47ebaca 100644 --- a/omniscidb/QueryEngine/CgenState.h +++ b/omniscidb/QueryEngine/CgenState.h @@ -283,6 +283,9 @@ struct CgenState { return result; } + llvm::Value* emitCall(llvm::IRBuilder<>& ir_builder, + const std::string& fname, + const std::vector& args); llvm::Value* emitCall(const std::string& fname, const std::vector& args); size_t getLiteralBufferUsage(const int device_id) { diff --git a/omniscidb/QueryEngine/CompilationOptions.h b/omniscidb/QueryEngine/CompilationOptions.h index 1de212a342..7555039bd2 100644 --- a/omniscidb/QueryEngine/CompilationOptions.h +++ b/omniscidb/QueryEngine/CompilationOptions.h @@ -156,6 +156,12 @@ struct ExecutionOptions { return eo; } + ExecutionOptions with_columnar_output(bool enable = true) const { + ExecutionOptions eo = *this; + eo.output_columnar_hint = enable; + return eo; + } + private: ExecutionOptions() {} }; diff --git a/omniscidb/QueryEngine/Execute.cpp b/omniscidb/QueryEngine/Execute.cpp index cd1be50adb..fd9f097bd7 100644 --- a/omniscidb/QueryEngine/Execute.cpp +++ b/omniscidb/QueryEngine/Execute.cpp @@ -70,6 +70,7 @@ #include "QueryEngine/SpeculativeTopN.h" #include "QueryEngine/StringDictionaryGenerations.h" #include "QueryEngine/Visitors/TransientStringLiteralsVisitor.h" +#include "ResultSet/ColRangeInfo.h" #include "Shared/checked_alloc.h" #include "Shared/funcannotations.h" #include "Shared/measure.h" @@ -1366,6 +1367,76 @@ ResultSetPtr Executor::reduceSpeculativeTopN( return m.asRows(ra_exe_unit, row_set_mem_owner, query_mem_desc, this, top_n, desc); } +hdk::ResultSetTable Executor::reducePartitionHistogram( + std::vector>>& results_per_device, + const QueryMemoryDescriptor& query_mem_desc, + std::shared_ptr row_set_mem_owner) const { + std::vector results; + std::vector buffers; + + results.reserve(results_per_device.size() + 1); + buffers.reserve(results_per_device.size() + 1); + + // In the reduction result we want each fragment result to hold an offset in + // output buffer instead of number of rows. Also, we want to make an additional + // result set holding final partition sizes. Achieve it by using a new zero-filled + // buffer for the the first fragment and partial sums for all other having + // partition sizes in the last of them. + // Additionally, we want them to be columnar Projection instead of PerfectHash + // to later zero-copy fetch all rows. + using IndexedResultSet = std::pair>; + std::sort(results_per_device.begin(), + results_per_device.end(), + [](const IndexedResultSet& lhs, const IndexedResultSet& rhs) { + CHECK_GE(lhs.second.size(), size_t(1)); + CHECK_GE(rhs.second.size(), size_t(1)); + return lhs.second.front() < rhs.second.front(); + }); + + auto parts = query_mem_desc.getEntryCount(); + auto proj_mem_desc = query_mem_desc; + proj_mem_desc.setQueryDescriptionType(QueryDescriptionType::Projection); + proj_mem_desc.setHasKeylessHash(false); + proj_mem_desc.clearGroupColWidths(); + // Currently, all perfect hash tables are expected to to use 8 byte padded width. + CHECK_EQ(static_cast(proj_mem_desc.getPaddedSlotWidthBytes(0)), 8); + auto first_rs = + std::make_shared(results_per_device.front().first->getTargetInfos(), + ExecutorDeviceType::CPU, + proj_mem_desc, + row_set_mem_owner, + data_mgr_, + blockSize(), + gridSize()); + first_rs->allocateStorage(plan_state_->init_agg_vals_); + results.push_back(first_rs); + buffers.push_back( + reinterpret_cast(first_rs->getStorage()->getUnderlyingBuffer())); + for (auto& pr : results_per_device) { + auto buf = pr.first->getStorage()->getUnderlyingBuffer(); + auto proj_rs = + std::make_shared(results_per_device.front().first->getTargetInfos(), + ExecutorDeviceType::CPU, + proj_mem_desc, + row_set_mem_owner, + data_mgr_, + blockSize(), + gridSize()); + proj_rs->allocateStorage(buf, {}); + results.push_back(proj_rs); + buffers.push_back(reinterpret_cast(buf)); + } + + memset(buffers[0], 0, sizeof(int64_t) * parts); + for (size_t i = 2; i < buffers.size(); ++i) { + for (size_t j = 0; j < parts; ++j) { + buffers[i][j] += buffers[i - 1][j]; + } + } + + return hdk::ResultSetTable(std::move(results)); +} + namespace { std::unordered_set get_available_gpus(const Data_Namespace::DataMgr* data_mgr) { @@ -1628,6 +1699,15 @@ std::ostream& operator<<(std::ostream& os, const RelAlgExecutionUnit& ra_exe_uni if (ra_exe_unit.union_all) { os << "\n\tUnion: " << std::string(*ra_exe_unit.union_all ? "UNION ALL" : "UNION"); } + if (ra_exe_unit.shuffle_fn) { + os << "\n\tShuffle: " << *ra_exe_unit.shuffle_fn; + } + if (ra_exe_unit.partition_offsets_col) { + os << "\n\tPartition offsets column: " + << ra_exe_unit.partition_offsets_col->toString(); + } + os << "\n\tPartitioned aggregation: " << ra_exe_unit.partitioned_aggregation; + return os; } @@ -1649,6 +1729,9 @@ RelAlgExecutionUnit replace_scan_limit(const RelAlgExecutionUnit& ra_exe_unit_in ra_exe_unit_in.hash_table_build_plan_dag, ra_exe_unit_in.table_id_to_node_map, ra_exe_unit_in.union_all, + ra_exe_unit_in.shuffle_fn, + ra_exe_unit_in.partition_offsets_col, + ra_exe_unit_in.partitioned_aggregation, ra_exe_unit_in.cost_model, ra_exe_unit_in.templs}; } @@ -1695,6 +1778,7 @@ hdk::ResultSetTable Executor::executeWorkUnit( } return result; } catch (const CompilationRetryNewScanLimit& e) { + CHECK(!ra_exe_unit_in.shuffle_fn); auto result = executeWorkUnitImpl(max_groups_buffer_entry_guess, is_agg, @@ -1823,7 +1907,8 @@ hdk::ResultSetTable Executor::finishStreamExecution( *ctx->query_mem_desc, ctx->query_comp_desc->getDeviceType(), row_set_mem_owner_, - ctx->co); + ctx->co, + ctx->eo); } catch (ReductionRanOutOfSlots&) { throw QueryExecutionError(ERR_OUT_OF_SLOTS); } catch (QueryExecutionError& e) { @@ -1925,6 +2010,93 @@ Executor::getExecutionPolicyForTargets(const RelAlgExecutionUnit& ra_exe_unit, return {std::move(exe_policy), requested_device_type}; } +// TODO: move this code to QueryMemoryInitializer +void Executor::allocateShuffleBuffers( + const std::vector& query_infos, + const RelAlgExecutionUnit& ra_exe_unit, + std::shared_ptr row_set_mem_owner, + SharedKernelContext& shared_context) { + CHECK(ra_exe_unit.isShuffle()); + auto partitions = ra_exe_unit.shuffle_fn->partitions; + std::vector target_infos; + for (auto& expr : ra_exe_unit.target_exprs) { + CHECK(expr->is()) << "Unsupported expr: " << expr->toString(); + target_infos.push_back(get_target_info(expr, getConfig().exec.group_by.bigint_count)); + } + + ColSlotContext slot_context(ra_exe_unit.target_exprs, {}, false); + QueryMemoryDescriptor query_mem_desc( + getDataMgr(), + getConfigPtr(), + query_infos, + false /*approx_quantile*/, + false /*allow_multifrag*/, + false /*keyless_hash*/, + false /*interleaved_bins_on_gpu*/, + -1 /*idx_target_as_key*/, + ColRangeInfo{QueryDescriptionType::Projection, 0, 0, 0, true}, + slot_context, + {} /*group_col_widths*/, + 8 /*group_col_compact_width*/, + {} /*target_groupby_indices*/, + 0 /*entry_count*/, + {} /*count_distinct_descriptors*/, + false /*sort_on_gpu_hint*/, + true /*output_columnar*/, + false /*must_use_baseline_sort*/, + false /*use_streaming_top_n*/); + + // Get buffer holding partition sizes. It is stored in the last fragment + // of the second input table. + CHECK_EQ(ra_exe_unit.input_descs.size(), (size_t)2); + auto count_table_info = + schema_provider_->getTableInfo(ra_exe_unit.input_descs.back().getTableRef()); + CHECK_EQ(count_table_info->row_count, partitions * count_table_info->fragments); + auto count_cols = schema_provider_->listColumns(*count_table_info); + CHECK_EQ(count_cols.size(), (size_t)2); + auto count_col_info = count_cols.front(); + auto unpin = [](Data_Namespace::AbstractBuffer* buf) { buf->unPin(); }; + std::unique_ptr sizes_buf( + data_mgr_->getChunkBuffer({count_col_info->db_id, + count_col_info->table_id, + count_col_info->column_id, + static_cast(count_table_info->fragments)}, + Data_Namespace::MemoryLevel::CPU_LEVEL, + 0, + partitions * sizeof(uint64_t)), + unpin); + CHECK_EQ(sizes_buf->size(), partitions * sizeof(uint64_t)); + const uint64_t* sizes_ptr = + reinterpret_cast(sizes_buf->getMemoryPtr()); + + // Create result set for each output partition. + shuffle_out_bufs_.clear(); + shuffle_out_buf_ptrs_.clear(); + shuffle_out_bufs_.reserve(partitions); + shuffle_out_buf_ptrs_.reserve(partitions); + for (size_t i = 0; i < partitions; ++i) { + query_mem_desc.setEntryCount(sizes_ptr[i]); + auto rs = std::make_shared(target_infos, + ExecutorDeviceType::CPU, + query_mem_desc, + row_set_mem_owner, + getDataMgr(), + blockSize(), + gridSize()); + rs->allocateStorage({}); + + shuffle_out_bufs_.emplace_back(); + for (size_t target_idx = 0; target_idx < ra_exe_unit.target_exprs.size(); + ++target_idx) { + shuffle_out_bufs_.back().push_back( + const_cast(rs->getColumnarBuffer(target_idx))); + } + shuffle_out_buf_ptrs_.push_back(shuffle_out_bufs_.back().data()); + + shared_context.addDeviceResults(std::move(rs), 0, {i + 1}); + } +} + hdk::ResultSetTable Executor::executeWorkUnitImpl( size_t& max_groups_buffer_entry_guess, const bool is_agg, @@ -1949,6 +2121,11 @@ hdk::ResultSetTable Executor::executeWorkUnitImpl( column_fetcher.freeLinearizedBuf(); column_fetcher.freeTemporaryCpuLinearizedIdxBuf(); }; + + if (ra_exe_unit.isShuffle()) { + allocateShuffleBuffers(query_infos, ra_exe_unit, row_set_mem_owner, shared_context); + } + std::map> query_comp_descs_owned; std::map> @@ -2066,7 +2243,8 @@ hdk::ResultSetTable Executor::executeWorkUnitImpl( *query_mem_descs_owned[reduction_device_type], reduction_device_type, row_set_mem_owner, - co); + co, + eo); } catch (ReductionRanOutOfSlots&) { throw QueryExecutionError(ERR_OUT_OF_SLOTS); } catch (OverflowOrUnderflow&) { @@ -2361,13 +2539,14 @@ ResultSetPtr build_row_for_empty_input( } // namespace -ResultSetPtr Executor::collectAllDeviceResults( +hdk::ResultSetTable Executor::collectAllDeviceResults( SharedKernelContext& shared_context, const RelAlgExecutionUnit& ra_exe_unit, const QueryMemoryDescriptor& query_mem_desc, const ExecutorDeviceType device_type, std::shared_ptr row_set_mem_owner, - const CompilationOptions& co) { + const CompilationOptions& co, + const ExecutionOptions& eo) { auto timer = DEBUG_TIMER(__func__); auto& result_per_device = shared_context.getFragmentResults(); if (result_per_device.empty() && query_mem_desc.getQueryDescriptionType() == @@ -2375,6 +2554,16 @@ ResultSetPtr Executor::collectAllDeviceResults( return build_row_for_empty_input( this, ra_exe_unit.target_exprs, query_mem_desc, device_type); } + if (ra_exe_unit.shuffle_fn) { + // Reduction of shuffle COUNT(*) results. + CHECK(ra_exe_unit.isShuffleCount()) << "unexpected shuffle results"; + return reducePartitionHistogram(result_per_device, query_mem_desc, row_set_mem_owner); + } + // Partitioned aggregation results don't need to be merged unless it is required + // by execution options. + if (ra_exe_unit.partitioned_aggregation && eo.multifrag_result) { + return get_separate_results(result_per_device); + } if (use_speculative_top_n(ra_exe_unit, query_mem_desc)) { try { return reduceSpeculativeTopN( @@ -2461,6 +2650,7 @@ std::vector> Executor::createKernels( // execution with no reduction required. if (device_type == ExecutorDeviceType::CPU && table_infos.size() == (size_t)1 && config_->exec.group_by.enable_cpu_multifrag_kernels && + !ra_exe_unit.partitioned_aggregation && (query_mem_desc.getQueryDescriptionType() == QueryDescriptionType::GroupByPerfectHash || query_mem_desc.getQueryDescriptionType() == @@ -2742,7 +2932,7 @@ std::vector Executor::getTableFragmentIndices( const auto outer_table_fragments = outer_table_fragments_it->second; CHECK(outer_table_fragments_it != selected_tables_fragments.end()); CHECK_LT(outer_frag_idx, outer_table_fragments->size()); - if (!table_idx) { + if (!table_idx || ra_exe_unit.isShuffle()) { return {outer_frag_idx}; } const auto& outer_fragment_info = (*outer_table_fragments)[outer_frag_idx]; diff --git a/omniscidb/QueryEngine/Execute.h b/omniscidb/QueryEngine/Execute.h index ae62314423..26ab6b6bd6 100644 --- a/omniscidb/QueryEngine/Execute.h +++ b/omniscidb/QueryEngine/Execute.h @@ -474,13 +474,14 @@ class Executor : public StringDictionaryProxyProvider { const std::vector& query_infos, size_t& max_groups_buffer_entry_guess); - ResultSetPtr collectAllDeviceResults( + hdk::ResultSetTable collectAllDeviceResults( SharedKernelContext& shared_context, const RelAlgExecutionUnit& ra_exe_unit, const QueryMemoryDescriptor& query_mem_desc, const ExecutorDeviceType device_type, std::shared_ptr row_set_mem_owner, - const CompilationOptions& co); + const CompilationOptions& co, + const ExecutionOptions& eo); std::unordered_map getInnerTabIdToJoinCond() const; @@ -655,7 +656,15 @@ class Executor : public StringDictionaryProxyProvider { std::vector>>& all_fragment_results, std::shared_ptr, const QueryMemoryDescriptor&) const; + hdk::ResultSetTable reducePartitionHistogram( + std::vector>>& results_per_device, + const QueryMemoryDescriptor& query_mem_desc, + std::shared_ptr row_set_mem_owner) const; + void allocateShuffleBuffers(const std::vector& query_infos, + const RelAlgExecutionUnit& ra_exe_unit, + std::shared_ptr row_set_mem_owner, + SharedKernelContext& shared_context); hdk::ResultSetTable executeWorkUnitImpl(size_t& max_groups_buffer_entry_guess, const bool is_agg, const bool allow_single_frag_table_opt, @@ -981,6 +990,9 @@ class Executor : public StringDictionaryProxyProvider { const TemporaryTables* temporary_tables_; TableIdToNodeMap table_id_to_node_map_; + std::vector> shuffle_out_bufs_; + std::vector shuffle_out_buf_ptrs_; + int64_t kernel_queue_time_ms_ = 0; int64_t compilation_queue_time_ms_ = 0; diff --git a/omniscidb/QueryEngine/ExecutionKernel.cpp b/omniscidb/QueryEngine/ExecutionKernel.cpp index 52786c7a60..326cd1b1c8 100644 --- a/omniscidb/QueryEngine/ExecutionKernel.cpp +++ b/omniscidb/QueryEngine/ExecutionKernel.cpp @@ -104,7 +104,7 @@ const std::vector& SharedKernelContext::getFragOffsets() { return all_frag_row_offsets_; } -void SharedKernelContext::addDeviceResults(ResultSetPtr&& device_results, +void SharedKernelContext::addDeviceResults(ResultSetPtr device_results, int outer_table_id, std::vector outer_table_fragment_ids) { std::lock_guard lock(reduce_mutex_); @@ -409,7 +409,7 @@ void ExecutionKernel::runImpl(Executor* executor, err = executor->executePlan(ra_exe_unit_, compilation_result, query_comp_desc.hoistLiterals(), - &device_results_, + ra_exe_unit_.isShuffle() ? nullptr : &device_results_, chosen_device_type, co, fetch_result->col_buffers, diff --git a/omniscidb/QueryEngine/ExecutionKernel.h b/omniscidb/QueryEngine/ExecutionKernel.h index 416cf928fc..bb90b47697 100644 --- a/omniscidb/QueryEngine/ExecutionKernel.h +++ b/omniscidb/QueryEngine/ExecutionKernel.h @@ -32,7 +32,7 @@ class SharedKernelContext { const std::vector& getFragOffsets(); - void addDeviceResults(ResultSetPtr&& device_results, + void addDeviceResults(ResultSetPtr device_results, int outer_table_id, std::vector outer_table_fragment_ids); diff --git a/omniscidb/QueryEngine/ExprDagVisitor.h b/omniscidb/QueryEngine/ExprDagVisitor.h index f2e74bc981..b28f417a19 100644 --- a/omniscidb/QueryEngine/ExprDagVisitor.h +++ b/omniscidb/QueryEngine/ExprDagVisitor.h @@ -33,6 +33,8 @@ class ExprDagVisitor : public ScalarExprVisitor { } else if (auto translated_join = dynamic_cast(node)) { visitTranslatedJoin(translated_join); + } else if (auto shuffle = dynamic_cast(node)) { + visitShuffle(shuffle); } else { LOG(FATAL) << "Unsupported node type: " << node->toString(); } @@ -84,6 +86,15 @@ class ExprDagVisitor : public ScalarExprVisitor { } } + virtual void visitShuffle(const hdk::ir::Shuffle* shuffle) { + for (auto& expr : shuffle->keys()) { + visit(expr.get()); + } + for (auto& expr : shuffle->exprs()) { + visit(expr.get()); + } + } + void* visitScalarSubquery(const hdk::ir::ScalarSubquery* subquery) const override { const_cast(this)->visit(subquery->node()); return nullptr; diff --git a/omniscidb/QueryEngine/MemoryLayoutBuilder.cpp b/omniscidb/QueryEngine/MemoryLayoutBuilder.cpp index 43b43a00e6..7ffed27b31 100644 --- a/omniscidb/QueryEngine/MemoryLayoutBuilder.cpp +++ b/omniscidb/QueryEngine/MemoryLayoutBuilder.cpp @@ -93,6 +93,20 @@ ColRangeInfo get_col_range_info(const RelAlgExecutionUnit& ra_exe_unit, std::optional group_cardinality_estimation, Executor* executor, const ExecutorDeviceType device_type) { + if (ra_exe_unit.shuffle_fn) { + CHECK(!ra_exe_unit.target_exprs.empty()); + // For shuffle COUNT(*) query we use keyless perfect hash. + if (ra_exe_unit.isShuffleCount()) { + return {QueryDescriptionType::GroupByPerfectHash, + 0, + static_cast(ra_exe_unit.shuffle_fn->partitions), + 0, + false}; + } + // For actual shuffle we use Projection. + CHECK(ra_exe_unit.isShuffle()); + return {QueryDescriptionType::Projection, 0, 1, 0, false}; + } const Config& config = executor->getConfig(); // Use baseline layout more eagerly on the GPU if the query uses count distinct, // because our HyperLogLog implementation is 4x less memory efficient on GPU. @@ -236,6 +250,11 @@ KeylessInfo get_keyless_info(const RelAlgExecutionUnit& ra_exe_unit, const std::vector& query_infos, const bool is_group_by, Executor* executor) { + // Shuffle counters always go keyless. + if (ra_exe_unit.isShuffleCount()) { + return {true, 0}; + } + bool keyless{true}, found{false}; int32_t num_agg_expr{0}; int32_t index{0}; @@ -541,7 +560,8 @@ int8_t pick_target_compact_width(const RelAlgExecutionUnit& ra_exe_unit, const std::vector& query_infos, const int8_t crt_min_byte_width, bool bigint_count) { - if (bigint_count) { + // Currently, we cannot handle 32-bit shuffle counters. + if (bigint_count || ra_exe_unit.isShuffleCount()) { return sizeof(int64_t); } int8_t compact_width{0}; @@ -857,7 +877,7 @@ std::unique_ptr build_query_memory_descriptor( // (acts as a key) idx_target_as_key = keyless_info.target_index; - if (group_col_widths.size() > 1) { + if (ra_exe_unit.isShuffleCount() || group_col_widths.size() > 1) { // col range info max contains the expected cardinality of the output entry_count = static_cast(actual_col_range_info.max); actual_col_range_info.bucket = 0; diff --git a/omniscidb/QueryEngine/QueryExecutionContext.cpp b/omniscidb/QueryEngine/QueryExecutionContext.cpp index be7a7fbeae..41d50bec6b 100644 --- a/omniscidb/QueryEngine/QueryExecutionContext.cpp +++ b/omniscidb/QueryEngine/QueryExecutionContext.cpp @@ -588,6 +588,9 @@ std::vector QueryExecutionContext::launchCpuCode( } out_vec.push_back( reinterpret_cast(estimator_result_set_->getHostEstimatorBuffer())); + } else if (ra_exe_unit.isShuffle()) { + out_vec.push_back( + reinterpret_cast(executor_->shuffle_out_buf_ptrs_.data())); } else { if (!is_group_by) { for (size_t i = 0; i < init_agg_vals.size(); ++i) { @@ -631,6 +634,9 @@ std::vector QueryExecutionContext::launchCpuCode( join_hash_tables.size() == 1 ? reinterpret_cast(join_hash_tables[0]) : (join_hash_tables.size() > 1 ? &join_hash_tables[0] : nullptr); + int64_t** out_buffers = ra_exe_unit.isShuffle() || !is_group_by + ? out_vec.data() + : query_buffers_->getGroupByBuffersPtr(); if (hoist_literals) { using agg_query = void (*)(const int8_t***, // col_buffers const uint64_t*, // num_fragments @@ -645,19 +651,18 @@ std::vector QueryExecutionContext::launchCpuCode( const uint32_t*, // num_tables const int64_t*); // join_hash_tables_ptr if (is_group_by) { - reinterpret_cast(native_code->func())( - multifrag_cols_ptr, - &num_fragments, - literal_buff.data(), - num_rows_ptr, - flatened_frag_offsets.data(), - &scan_limit, - &total_matched_init, - cmpt_val_buff.data(), - query_buffers_->getGroupByBuffersPtr(), - error_code, - &num_tables, - join_hash_tables_ptr); + reinterpret_cast(native_code->func())(multifrag_cols_ptr, + &num_fragments, + literal_buff.data(), + num_rows_ptr, + flatened_frag_offsets.data(), + &scan_limit, + &total_matched_init, + cmpt_val_buff.data(), + out_buffers, + error_code, + &num_tables, + join_hash_tables_ptr); } else { reinterpret_cast(native_code->func())(multifrag_cols_ptr, &num_fragments, @@ -667,7 +672,7 @@ std::vector QueryExecutionContext::launchCpuCode( &scan_limit, &total_matched_init, init_agg_vals.data(), - out_vec.data(), + out_buffers, error_code, &num_tables, join_hash_tables_ptr); @@ -685,18 +690,17 @@ std::vector QueryExecutionContext::launchCpuCode( const uint32_t*, // num_tables const int64_t*); // join_hash_tables_ptr if (is_group_by) { - reinterpret_cast(native_code->func())( - multifrag_cols_ptr, - &num_fragments, - num_rows_ptr, - flatened_frag_offsets.data(), - &scan_limit, - &total_matched_init, - cmpt_val_buff.data(), - query_buffers_->getGroupByBuffersPtr(), - error_code, - &num_tables, - join_hash_tables_ptr); + reinterpret_cast(native_code->func())(multifrag_cols_ptr, + &num_fragments, + num_rows_ptr, + flatened_frag_offsets.data(), + &scan_limit, + &total_matched_init, + cmpt_val_buff.data(), + out_buffers, + error_code, + &num_tables, + join_hash_tables_ptr); } else { reinterpret_cast(native_code->func())(multifrag_cols_ptr, &num_fragments, @@ -705,7 +709,7 @@ std::vector QueryExecutionContext::launchCpuCode( &scan_limit, &total_matched_init, init_agg_vals.data(), - out_vec.data(), + out_buffers, error_code, &num_tables, join_hash_tables_ptr); diff --git a/omniscidb/QueryEngine/QueryPhysicalInputsCollector.cpp b/omniscidb/QueryEngine/QueryPhysicalInputsCollector.cpp index fa5eafd6dc..22a8cda70d 100644 --- a/omniscidb/QueryEngine/QueryPhysicalInputsCollector.cpp +++ b/omniscidb/QueryEngine/QueryPhysicalInputsCollector.cpp @@ -75,6 +75,16 @@ class PhysicalInputsNodeVisitor : public RelAlgVisitor { return result; } + ResultType visitShuffle(const hdk::ir::Shuffle* shuffle) const override { + ResultType result; + ExprVisitor visitor; + for (auto& expr : shuffle->exprs()) { + const auto inputs = visitor.visit(expr.get()); + result.insert(inputs.begin(), inputs.end()); + } + return result; + } + protected: ResultType aggregateResult(const ResultType& aggregate, const ResultType& next_result) const override { diff --git a/omniscidb/QueryEngine/QueryRewrite.cpp b/omniscidb/QueryEngine/QueryRewrite.cpp index 902a538191..5d87d7a765 100644 --- a/omniscidb/QueryEngine/QueryRewrite.cpp +++ b/omniscidb/QueryEngine/QueryRewrite.cpp @@ -273,6 +273,9 @@ RelAlgExecutionUnit QueryRewriter::rewriteAggregateOnGroupByColumn( ra_exe_unit_in.query_plan_dag, ra_exe_unit_in.hash_table_build_plan_dag, ra_exe_unit_in.table_id_to_node_map, - ra_exe_unit_in.union_all}; + ra_exe_unit_in.union_all, + ra_exe_unit_in.shuffle_fn, + ra_exe_unit_in.partition_offsets_col, + ra_exe_unit_in.partitioned_aggregation}; return rewritten_exe_unit; } diff --git a/omniscidb/QueryEngine/RelAlgExecutionUnit.h b/omniscidb/QueryEngine/RelAlgExecutionUnit.h index 1fb67cce5d..d59e9961cc 100644 --- a/omniscidb/QueryEngine/RelAlgExecutionUnit.h +++ b/omniscidb/QueryEngine/RelAlgExecutionUnit.h @@ -143,9 +143,21 @@ struct RelAlgExecutionUnit { TableIdToNodeMap table_id_to_node_map{}; // empty if not a UNION, true if UNION ALL, false if regular UNION const std::optional union_all; + // If non-empty, then this unit is a shuffling unit with the specified + // function. groupby_exprs hold keys, target_exprs hold all output + // expressions. + std::optional shuffle_fn; + // When shuffling data, this references the column holding computed + // partition offsets. + hdk::ir::ExprPtr partition_offsets_col; + // Set to true when aggregating partitioned input. + bool partitioned_aggregation; std::shared_ptr cost_model; std::vector templs; + + bool isShuffleCount() const { return shuffle_fn && !partition_offsets_col; } + bool isShuffle() const { return shuffle_fn && partition_offsets_col; } }; std::ostream& operator<<(std::ostream& os, const RelAlgExecutionUnit& ra_exe_unit); diff --git a/omniscidb/QueryEngine/RelAlgExecutor.cpp b/omniscidb/QueryEngine/RelAlgExecutor.cpp index 3763e4f949..b7234f5db6 100644 --- a/omniscidb/QueryEngine/RelAlgExecutor.cpp +++ b/omniscidb/QueryEngine/RelAlgExecutor.cpp @@ -38,10 +38,12 @@ #include "QueryEngine/WindowContext.h" #include "QueryEngine/WorkUnitBuilder.h" #include "QueryOptimizer/CanonicalizeQuery.h" +#include "ResultSet/ColRangeInfo.h" #include "ResultSet/HyperLogLog.h" #include "ResultSetRegistry/ResultSetRegistry.h" #include "SchemaMgr/SchemaMgr.h" #include "SessionInfo.h" +#include "Shared/MathUtils.h" #include "Shared/funcannotations.h" #include "Shared/measure.h" #include "Shared/misc.h" @@ -163,9 +165,10 @@ ExecutionResult RelAlgExecutor::executeRelAlgQuery(const CompilationOptions& co, constexpr bool vlog_result_set_summary{false}; if constexpr (vlog_result_set_summary) { - VLOG(1) << execution_result.getRows()->summaryToString(); + for (size_t i = 0; i < execution_result.getToken()->resultSetCount(); ++i) { + VLOG(1) << execution_result.getToken()->resultSet(i)->summaryToString(); + } } - execution_result.getRows()->moveToBegin(); if (post_execution_callback_) { VLOG(1) << "Running post execution callback."; @@ -187,11 +190,23 @@ ExecutionResult RelAlgExecutor::executeRelAlgQuery(const CompilationOptions& co, return run_query(co_cpu); } -void printTree(const hdk::ir::Node* node, std::string prefix = "|") { - std::cout << prefix << node->toString() << std::endl; - for (size_t i = 0; i < node->inputCount(); ++i) { - printTree(node->getInput(i), prefix + "----"); +std::string treeToString(const hdk::ir::Node* node, + bool stop_on_executed = true, + std::string prefix = "|") { + std::stringstream ss; + ss << prefix << node->toString() << std::endl; + if (!stop_on_executed || !node->getResult()) { + for (size_t i = 0; i < node->inputCount(); ++i) { + ss << treeToString(node->getInput(i), stop_on_executed, prefix + "----"); + } } + return ss.str(); +} + +void printTree(const hdk::ir::Node* node, + bool stop_on_executed = true, + std::string prefix = "|") { + std::cout << treeToString(node, stop_on_executed, prefix); } ExecutionResult RelAlgExecutor::executeRelAlgQueryNoRetry(const CompilationOptions& co, @@ -398,8 +413,11 @@ std::shared_ptr RelAlgExecutor::execute( for (size_t i = 0; i < exec_desc_count; i++) { VLOG(1) << "Executing query step " << i; // When we execute the last step, we expect the result to consist of a single - // ResultSet. Also, check if following steps can consume multifrag input. - auto multifrag_result = eo.multifrag_result && (i != (exec_desc_count - 1)); + // ResultSet unless said otherwise by the config. Also, check if following steps + // can consume multifrag input. + auto multifrag_result = + eo.multifrag_result && + (i != (exec_desc_count - 1) || config_.exec.enable_multifrag_execution_result); if (multifrag_result) { for (size_t j = i + 1; j < exec_desc_count; j++) { if (!supportsMultifragInput(seq.step(j), seq.step(i))) { @@ -426,6 +444,9 @@ std::shared_ptr RelAlgExecutor::execute( auto eo_extern = eo; eo_extern.executor_type = ::ExecutorType::Extern; executeStep(seq.step(i), co, eo_extern, queue_time_ms); + } catch (const RequestPartitionedAggregation& e) { + executeStepWithPartitionedAggregation( + seq.step(i), co, eo, e.estimatedBufferSize(), queue_time_ms); } } @@ -548,68 +569,20 @@ const hdk::ir::Type* canonicalTypeForExpr(const hdk::ir::Expr& expr) { return res; } -template std::vector get_targets_meta( - const RA* ra_node, + const hdk::ir::Node* node, const std::vector& target_exprs) { std::vector targets_meta; - CHECK_EQ(ra_node->size(), target_exprs.size()); - for (size_t i = 0; i < ra_node->size(); ++i) { + CHECK_EQ(node->size(), target_exprs.size()); + for (size_t i = 0; i < node->size(); ++i) { CHECK(target_exprs[i]); // TODO(alex): remove the count distinct type fixup. - targets_meta.emplace_back(ra_node->getFieldName(i), + targets_meta.emplace_back(node->getFieldName(i), canonicalTypeForExpr(*target_exprs[i])); } return targets_meta; } -template <> -std::vector get_targets_meta( - const hdk::ir::Node* node, - const std::vector& target_exprs); - -template <> -std::vector get_targets_meta( - const hdk::ir::Filter* filter, - const std::vector& target_exprs) { - return get_targets_meta(filter->getInput(0), target_exprs); -} - -template <> -std::vector get_targets_meta( - const hdk::ir::Sort* sort, - const std::vector& target_exprs) { - return get_targets_meta(sort->getInput(0), target_exprs); -} - -template <> -std::vector get_targets_meta( - const hdk::ir::LogicalUnion* logical_union, - const std::vector& target_exprs) { - return get_targets_meta(logical_union->getInput(0), target_exprs); -} - -template <> -std::vector get_targets_meta( - const hdk::ir::Node* node, - const std::vector& target_exprs) { - if (auto proj = node->as()) { - return get_targets_meta(proj, target_exprs); - } else if (auto logical_union = node->as()) { - return get_targets_meta(logical_union, target_exprs); - } else if (auto agg = node->as()) { - return get_targets_meta(agg, target_exprs); - } else if (auto scan = node->as()) { - return get_targets_meta(scan, target_exprs); - } else if (auto sort = node->as()) { - return get_targets_meta(sort, target_exprs); - } else if (auto filter = node->as()) { - return get_targets_meta(filter, target_exprs); - } - UNREACHABLE() << "Unhandled node type: " << node->toString(); - return {}; -} - bool is_agg_step(const hdk::ir::Node* node) { if (node->getResult() || node->is()) { return false; @@ -617,6 +590,9 @@ bool is_agg_step(const hdk::ir::Node* node) { if (node->is()) { return true; } + if (auto shuffle = node->as()) { + return shuffle->isCount(); + } for (size_t i = 0; i < node->inputCount(); ++i) { if (is_agg_step(node->getInput(i))) { return true; @@ -625,6 +601,61 @@ bool is_agg_step(const hdk::ir::Node* node) { return false; } +class NonGroupbyInputColIndicesCollector + : public hdk::ir::ExprCollector, + NonGroupbyInputColIndicesCollector> { + public: + NonGroupbyInputColIndicesCollector(const hdk::ir::Aggregate* agg) : agg_(agg){}; + + static std::set collect(const hdk::ir::Aggregate* agg) { + return BaseClass::collect(agg->getAggs(), agg); + } + + protected: + void visitColumnRef(const hdk::ir::ColumnRef* col_ref) override { + if (col_ref->index() >= agg_->getGroupByCount()) { + result_.insert(col_ref->index()); + } + } + + const hdk::ir::Aggregate* agg_; +}; + +void buildUsedAggColumns(const hdk::ir::Aggregate* agg, + hdk::ir::ExprPtrVector& out_exprs, + std::vector& out_fields, + std::unordered_map& out_mapping) { + auto input = agg->getInput(0); + for (size_t i = 0; i < agg->getGroupByCount(); ++i) { + out_exprs.push_back(hdk::ir::getNodeColumnRef(input, static_cast(i))); + out_fields.push_back(input->getFieldName(i)); + } + auto data_cols = NonGroupbyInputColIndicesCollector::collect(agg); + for (auto& idx : data_cols) { + out_mapping[idx] = static_cast(out_exprs.size()); + out_exprs.push_back(hdk::ir::getNodeColumnRef(input, idx)); + out_fields.push_back(input->getFieldName(idx)); + } +} + +// Check of node should be executed before shuffle can be applied to it. +// Shuffle goes in two passes and, due to lack of appropriate cost model, +// we prefer shuffle's input to be materialized. Exception is when it is +// a simple projection. +// TODO: enable more cases when cheap operation can be merged into shuffle. +bool shouldMaterializeShuffleInput(const hdk::ir::Node* node) { + if (node->getResult() || node->is()) { + return false; + } + + auto proj = node->as(); + if (!proj) { + return true; + } + + return !proj->isSimple() || shouldMaterializeShuffleInput(proj->getInput(0)); +} + } // namespace hdk::ir::ExprPtr set_transient_dict(const hdk::ir::ExprPtr expr) { @@ -662,15 +693,162 @@ hdk::ir::ExprPtr translate(const hdk::ir::Expr* expr, return res; } +void RelAlgExecutor::executeStepWithPartitionedAggregation(const hdk::ir::Node* step_root, + const CompilationOptions& co, + const ExecutionOptions& eo, + size_t estimated_buffer_size, + const int64_t queue_time_ms) { + auto sort = step_root->as(); + auto agg = sort ? sort->getInput(0)->as() + : step_root->as(); + CHECK(agg); + + // Materialize data to shuffle if required. + auto agg_input = agg->getInput(0); + auto agg_input_shared = const_cast(agg)->getAndOwnInput(0); + std::shared_ptr proj; + std::unordered_map input_map; + if (shouldMaterializeShuffleInput(agg_input)) { + // Make additional projection to possibly filter out unused columns (e.g. in + // case of join). + hdk::ir::ExprPtrVector exprs; + std::vector fields; + buildUsedAggColumns(agg, exprs, fields, input_map); + proj = std::make_shared( + std::move(exprs), std::move(fields), agg_input_shared); + VLOG(1) << "Materializing data for partitioning."; + executeStep(proj.get(), co, eo, queue_time_ms); + } + + // Now that data to shuffle is materialized, we can start shuffling. As the first + // step, we compute the desired number of resulting partitions and compute their sizes. + // By default, we want to have enough partitions to load all cores but not to slow + // down partitioning too much by too many output streams. + size_t min_partitions = config_.exec.group_by.min_partitions + ? config_.exec.group_by.min_partitions + : cpu_threads() * 2; + size_t max_partitions = std::max(config_.exec.group_by.max_partitions, min_partitions); + // For now, we require number of partitions to be power of 2 to avoid division + // operation in partitioning code. + min_partitions = shared::roundUpToPow2(min_partitions); + max_partitions = shared::roundUpToPow2(max_partitions); + size_t partitions = min_partitions; + // Increase number of partitions until we achieve target output buffer size or + // hit the partitioning limit. + while (partitions < max_partitions && + estimated_buffer_size / partitions > + config_.exec.group_by.partitioning_buffer_target_size) { + partitions = partitions * 2; + } + VLOG(1) << "Selected to use " << partitions << " partitions. Used range is [" + << min_partitions << ", " << max_partitions << "]. Estimated buffer size is " + << estimated_buffer_size; + hdk::ir::ShuffleFunction shuffle_fn{hdk::ir::ShuffleFunction::kHash, partitions}; + VLOG(1) << "Selected shuffle function is " << shuffle_fn; + hdk::ir::NodePtr shuffle_input = proj ? proj : agg_input_shared; + hdk::ir::ExprPtrVector shuffle_keys; + for (unsigned i = 0; i < static_cast(agg->getGroupByCount()); ++i) { + shuffle_keys.push_back(getNodeColumnRef(shuffle_input.get(), i)); + } + hdk::ir::NodePtr count_shuffle_node = std::make_shared( + shuffle_keys, + hdk::ir::makeExpr(hdk::ir::Context::defaultCtx().int64(false), + hdk::ir::AggType::kCount, + nullptr, + false, + nullptr), + "part_size", + shuffle_fn, + shuffle_input); + VLOG(1) << "Execute COUNT(*) for data shuffle."; + { + auto timer = DEBUG_TIMER("Partitions histogram computation"); + executeStep( + count_shuffle_node.get(), co, eo.with_columnar_output(true), queue_time_ms); + } + + // With partition sizes now known, start actual data shuffling. + hdk::ir::ExprPtrVector shuffle_input_cols; + std::vector fields; + if (proj) { + shuffle_input_cols = hdk::ir::getNodeColumnRefs(proj.get()); + fields = proj->getFields(); + } else { + hdk::ir::ExprPtrVector exprs; + buildUsedAggColumns(agg, shuffle_input_cols, fields, input_map); + } + hdk::ir::ExprPtrVector shuffle_exprs; + auto offsets_ref = hdk::ir::getNodeColumnRef(count_shuffle_node.get(), 0); + hdk::ir::NodePtr shuffle_node = std::make_shared( + std::move(shuffle_keys), + std::move(shuffle_input_cols), + std::move(fields), + shuffle_fn, + std::vector({shuffle_input, count_shuffle_node})); + VLOG(1) << "Execute data shuffle."; + auto shuffle_eo = eo; + shuffle_eo.output_columnar_hint = true; + shuffle_eo.preserve_order = false; + auto shuffle_co = co; + shuffle_co.allow_lazy_fetch = false; + { + auto timer = DEBUG_TIMER("Data shuffling"); + executeStep(shuffle_node.get(), shuffle_co, shuffle_eo, queue_time_ms); + } + + // Now we can remove projection and its result to free memory. + if (proj) { + temporary_tables_.erase(-proj->getId()); + proj.reset(); + } + + // Create new aggregation node and execute it. + auto part_agg = std::make_shared( + agg->getGroupByCount(), agg->getAggs(), agg->getFields(), agg_input_shared); + part_agg->replaceInput(agg_input_shared, shuffle_node, input_map); + part_agg->setPartitioned(true); + // Use max partition size for a new entry count guess. + auto count_res_token = temporary_tables_.at(-count_shuffle_node->getId()); + const uint64_t* size_buf = reinterpret_cast( + count_res_token->resultSet(count_res_token->resultSetCount() - 1) + ->getStorage() + ->getUnderlyingBuffer()); + auto max_partition_size = *std::max_element(size_buf, size_buf + partitions); + part_agg->setBufferEntryCountHint(max_partition_size * 2); + VLOG(1) << "Using buffer entry count hint for partitioned aggregation: " + << part_agg->bufferEntryCountHint(); + hdk::ir::NodePtr new_root = part_agg; + if (sort) { + new_root = sort->deepCopy(); + new_root->replaceInput(new_root->getAndOwnInput(0), part_agg); + } + VLOG(1) << "Execute partitioned aggregation."; + { + auto timer = DEBUG_TIMER("Partitioned aggregation"); + executeStep(new_root.get(), co, eo, queue_time_ms); + } + + // Register result as a temporary table for the original node. + addTemporaryTable(-step_root->getId(), new_root->getResult()->getToken()); + step_root->setResult(new_root->getResult()); + // Remove temporary tables we don't need anymore. + temporary_tables_.erase(-count_shuffle_node->getId()); + temporary_tables_.erase(-shuffle_node->getId()); + temporary_tables_.erase(-part_agg->getId()); + temporary_tables_.erase(-new_root->getId()); +} + void RelAlgExecutor::executeStep(const hdk::ir::Node* step_root, const CompilationOptions& co, const ExecutionOptions& eo, const int64_t queue_time_ms) { + VLOG(1) << "Executing query step:\n" << treeToString(step_root, true); ExecutionResult res; try { // TODO: move allow_speculative_sort to ExecutionOptions? res = executeStep(step_root, co, eo, queue_time_ms, true); } catch (const SpeculativeTopNFailed& e) { + VLOG(1) << "Retrying step with disabled speculative TopN."; res = executeStep(step_root, co, eo, queue_time_ms, false); } @@ -1088,6 +1266,88 @@ RelAlgExecutionUnit decide_approx_count_distinct_implementation( return ra_exe_unit; } +// Here we try to decide if we should use hash partitioning for groupby execution. +// Partitioned aggregation is beneficial in case of a very large output buffer. +// Partitioning allows us to reduce the size of the buffer used for each partition +// and also completely avoid reduction. +// Current heuristic is very simple and covers the case when a lot of very small +// groups are processed. +void maybeRequestPartitionedAggregation(const RelAlgExecutionUnit& ra_exe_unit, + size_t estimated_buffer_entries, + const CompilationOptions& co, + DataProvider* data_provider, + const Config& config) { + // Partitioned aggregation is supported for CPU only. + if (co.device_type != ExecutorDeviceType::CPU) { + VLOG(1) << "Ignore partitioned aggregation option for non-CPU device"; + return; + } + + // Check if allowed by the config. + if (!config.exec.group_by.enable_cpu_partitioned_groupby) { + VLOG(1) << "Ignore partitioned aggregation option. Disabled by config."; + return; + } + + for (auto& input : ra_exe_unit.input_col_descs) { + if (input->type()->isVarLen()) { + VLOG(1) << "Drop partitioned aggregation option due to varlen data."; + return; + } + } + + // Check output buffer size threshold. + size_t entry_size = 0; + for (auto& key : ra_exe_unit.groupby_exprs) { + if (!key) { + entry_size += 8; + } else { + if (key->type()->isString() || key->type()->isArray()) { + entry_size += 16; + } else { + entry_size += key->type()->canonicalSize(); + } + } + } + for (auto expr : ra_exe_unit.target_exprs) { + // Skip key columns. + if (!expr->is()) { + entry_size += expr->type()->canonicalSize(); + } + } + if (estimated_buffer_entries * entry_size < + config.exec.group_by.partitioning_buffer_size_threshold) { + VLOG(1) + << "Drop partitioned aggregation option due to the small output buffer size of " + << (estimated_buffer_entries * entry_size) << " bytes. Threshold value is " + << config.exec.group_by.partitioning_buffer_size_threshold; + return; + } + + // Request shuffle when the number of estimated groups is comparable to the input + // rows count. For now, ignore the fact that joins and filters change the number of + // rows to be aggregated, so simply use the size of the outermost table. + // Number entries is 2x of the estimated number of groups. Use partitioning if we + // expect less than the configured threshold number of rows per each group in average. + auto table_meta = + data_provider->getTableMetadata(ra_exe_unit.input_descs[0].getDatabaseId(), + ra_exe_unit.input_descs[0].getTableId()); + if (table_meta.getNumTuples() * 2 < + estimated_buffer_entries * config.exec.group_by.partitioning_group_size_threshold) { + LOG(INFO) << "Requesting partitioned aggregation (entries=" + << estimated_buffer_entries << ", rows=" << table_meta.getNumTuples() + << ")"; + throw RequestPartitionedAggregation(entry_size, estimated_buffer_entries); + } + + VLOG(1) << "Drop partitioned aggregation option due to the small estimated number of " + "groups. Input rows: " + << table_meta.getNumTuples() + << " Estimated group count: " << (estimated_buffer_entries / 2) + << " Threshold ratio: " + << config.exec.group_by.partitioning_group_size_threshold; +} + } // namespace ExecutionResult RelAlgExecutor::executeWorkUnit( @@ -1202,10 +1462,17 @@ ExecutionResult RelAlgExecutor::executeWorkUnit( result = execute_and_handle_errors( card, /*has_cardinality_estimation=*/true, /*has_ndv_estimation=*/false); } else { - result = execute_and_handle_errors(max_groups_buffer_entry_guess, - groups_approx_upper_bound(table_infos) <= - config_.exec.group_by.big_group_threshold, - /*has_ndv_estimation=*/false); + result = execute_and_handle_errors( + max_groups_buffer_entry_guess, + // In case of partitioned aggregation, we use max partition size as an + // estimation. + // In case of small enough input, we use configured default estimation. + // Otherwise, we state that we don't have an estimation and cardinality + // estimator would be requested in case of baseline hash groupby. + ra_exe_unit.partitioned_aggregation || + groups_approx_upper_bound(table_infos) <= + config_.exec.group_by.big_group_threshold, + /*has_ndv_estimation=*/ra_exe_unit.partitioned_aggregation); } } catch (const CardinalityEstimationRequired& e) { // check the cardinality cache @@ -1221,6 +1488,8 @@ ExecutionResult RelAlgExecutor::executeWorkUnit( : std::min(groups_approx_upper_bound(table_infos), g_estimator_failure_max_groupby_size); CHECK_GT(estimated_groups_buffer_entry_guess, size_t(0)); + maybeRequestPartitionedAggregation( + ra_exe_unit, estimated_groups_buffer_entry_guess, co, data_provider_, config_); result = execute_and_handle_errors( estimated_groups_buffer_entry_guess, true, /*has_ndv_estimation=*/true); if (!(eo.just_validate || eo.just_explain)) { diff --git a/omniscidb/QueryEngine/RelAlgExecutor.h b/omniscidb/QueryEngine/RelAlgExecutor.h index 3516347cda..c7bdfc3204 100644 --- a/omniscidb/QueryEngine/RelAlgExecutor.h +++ b/omniscidb/QueryEngine/RelAlgExecutor.h @@ -126,6 +126,11 @@ class RelAlgExecutor { const CompilationOptions& co, const ExecutionOptions& eo, const int64_t queue_time_ms); + void executeStepWithPartitionedAggregation(const hdk::ir::Node* step_root, + const CompilationOptions& co, + const ExecutionOptions& eo, + size_t estimated_buffer_size, + const int64_t queue_time_ms); ExecutionResult executeStep(const hdk::ir::Node* step_root, const CompilationOptions& co, const ExecutionOptions& eo, diff --git a/omniscidb/QueryEngine/RelAlgVisitor.h b/omniscidb/QueryEngine/RelAlgVisitor.h index 1e4c670829..27e2f27566 100644 --- a/omniscidb/QueryEngine/RelAlgVisitor.h +++ b/omniscidb/QueryEngine/RelAlgVisitor.h @@ -59,6 +59,10 @@ class RelAlgVisitor { if (logical_union) { return aggregateResult(result, visitLogicalUnion(logical_union)); } + const auto shuffle = dynamic_cast(rel_alg); + if (shuffle) { + return aggregateResult(result, visitShuffle(shuffle)); + } LOG(FATAL) << "Unhandled rel_alg type: " << rel_alg->toString(); return {}; } @@ -83,6 +87,8 @@ class RelAlgVisitor { return defaultResult(); } + virtual T visitShuffle(const hdk::ir::Shuffle*) const { return defaultResult(); } + protected: virtual T aggregateResult(const T& aggregate, const T& next_result) const { return next_result; diff --git a/omniscidb/QueryEngine/RowFuncBuilder.cpp b/omniscidb/QueryEngine/RowFuncBuilder.cpp index 3989fe64e0..815378c165 100644 --- a/omniscidb/QueryEngine/RowFuncBuilder.cpp +++ b/omniscidb/QueryEngine/RowFuncBuilder.cpp @@ -45,6 +45,7 @@ #include "QueryEngine/TopKSort.h" #include "QueryEngine/WindowContext.h" #include "ResultSet/QueryMemoryDescriptor.h" +#include "Shared/MathUtils.h" #include "Shared/checked_alloc.h" #include "Shared/funcannotations.h" #include "ThirdParty/robin_hood.h" @@ -123,6 +124,7 @@ bool RowFuncBuilder::codegen(llvm::Value* filter_result, { const bool is_group_by = !ra_exe_unit_.groupby_exprs.empty(); + const bool is_shuffle = !!ra_exe_unit_.shuffle_fn; if (executor_->isArchMaxwell(co.device_type)) { executor_->prependForceSync(); @@ -135,7 +137,12 @@ bool RowFuncBuilder::codegen(llvm::Value* filter_result, false); filter_false = filter_cfg.cond_false_; - if (is_group_by) { + if (is_shuffle) { + filter_cfg.setChainToNext(); + auto agg_out_ptr_w_idx = codegenPartitionKey(query_mem_desc, co, filter_cfg); + can_return_error = codegenShuffle( + agg_out_ptr_w_idx, query_mem_desc, co, gpu_smem_context, filter_cfg); + } else if (is_group_by) { if (query_mem_desc.getQueryDescriptionType() == QueryDescriptionType::Projection && !query_mem_desc.useStreamingTopN()) { const auto crt_matched = get_arg_by_name(ROW_FUNC, "crt_matched"); @@ -465,6 +472,69 @@ std::tuple RowFuncBuilder::codegenGroupBy( return std::make_tuple(nullptr, nullptr); } +std::tuple RowFuncBuilder::codegenPartitionKey( + const QueryMemoryDescriptor& query_mem_desc, + const CompilationOptions& co, + DiamondCodegen& diamond_codegen) { + AUTOMATIC_IR_METADATA(executor_->cgen_state_.get()); + auto arg_it = ROW_FUNC->arg_begin(); + auto groups_buffer = arg_it++; + + std::stack array_loops; + CHECK(query_mem_desc.getQueryDescriptionType() == QueryDescriptionType::Projection || + query_mem_desc.getQueryDescriptionType() == + QueryDescriptionType::GroupByPerfectHash); + CHECK(ra_exe_unit_.shuffle_fn); + CHECK(ra_exe_unit_.shuffle_fn->kind == hdk::ir::ShuffleFunction::kHash); + + CodeGenerator code_generator(executor_, co.codegen_traits_desc); + // The following code implements MurmurHash64A. + auto mul_value = LL_INT(0xc6a4a7935bd1e995LLU); + auto shr_value = LL_INT(47LLU); + uint64_t hash_init_val = + 0 ^ (ra_exe_unit_.groupby_exprs.size() * 8 * 0xc6a4a7935bd1e995LLU); + llvm::Value* cur_hash_value = LL_INT(hash_init_val); + for (const auto& group_expr : ra_exe_unit_.groupby_exprs) { + // k = static_cast(val) + auto group_key = code_generator.codegen(group_expr.get(), true, co).front(); + if (!group_key->getType()->isIntegerTy(64)) { + group_key = + LL_BUILDER.CreateBitCast(executor_->cgen_state_->castToTypeIn(group_key, 64), + get_int_type(64, LL_CONTEXT)); + } + // k *= m; + group_key = LL_BUILDER.CreateMul(group_key, mul_value); + // k ^= k >> r; + group_key = + LL_BUILDER.CreateXor(group_key, LL_BUILDER.CreateLShr(group_key, shr_value)); + // k *= m; + group_key = LL_BUILDER.CreateMul(group_key, mul_value); + // h ^= k; + cur_hash_value = LL_BUILDER.CreateXor(cur_hash_value, group_key); + // h *= m; + cur_hash_value = LL_BUILDER.CreateMul(cur_hash_value, mul_value); + } + // h ^= h >> r; + cur_hash_value = LL_BUILDER.CreateXor(cur_hash_value, + LL_BUILDER.CreateLShr(cur_hash_value, shr_value)); + // h *= m; + cur_hash_value = LL_BUILDER.CreateMul(cur_hash_value, mul_value); + // h ^= h >> r; + cur_hash_value = LL_BUILDER.CreateXor(cur_hash_value, + LL_BUILDER.CreateLShr(cur_hash_value, shr_value)); + // h = h % partitions + size_t partitions = ra_exe_unit_.shuffle_fn->partitions; + if (shared::isPowOfTwo(partitions)) { + cur_hash_value = LL_BUILDER.CreateAnd(cur_hash_value, LL_INT(partitions - 1)); + } else { + cur_hash_value = LL_BUILDER.CreateURem(cur_hash_value, LL_INT(partitions)); + } + cur_hash_value = executor_->cgen_state_->castToTypeIn(cur_hash_value, 32); + cur_hash_value->setName("partition_key"); + + return std::make_tuple(groups_buffer, cur_hash_value); +} + llvm::Value* RowFuncBuilder::codegenVarlenOutputBuffer( const QueryMemoryDescriptor& query_mem_desc, const CompilationOptions& co) { @@ -862,6 +932,92 @@ bool RowFuncBuilder::codegenAggCalls( return can_return_error; } +bool RowFuncBuilder::codegenShuffle( + const std::tuple& agg_out_ptr_w_idx, + QueryMemoryDescriptor& query_mem_desc, + const CompilationOptions& co, + const GpuSharedMemoryContext& gpu_smem_context, + DiamondCodegen& diamond_codegen) { + if (ra_exe_unit_.isShuffleCount()) { + TargetExprCodegenBuilder target_builder(ra_exe_unit_, true); + CHECK_EQ(ra_exe_unit_.target_exprs.size(), (size_t)1); + target_builder(ra_exe_unit_.target_exprs.front(), executor_, query_mem_desc, co); + target_builder.codegen(this, + executor_, + query_mem_desc, + co, + gpu_smem_context, + agg_out_ptr_w_idx, + {}, + nullptr, + nullptr, + nullptr, + diamond_codegen); + } else { + // In data shuffle we don't use the regular way to compute output buffer and offset. + // This is because each output partition has its own size. + // The first element of the agg_out_ptr_w_idx pair is holding array of arrays of + // output buffer pointers. First partitioning key and then target index are used to + // access these arrays to get actual output buffer for the current value. + CodeGenerator code_generator(executor_, co.codegen_traits_desc); + auto output_buffers = std::get<0>(agg_out_ptr_w_idx); + auto partition_key = std::get<1>(agg_out_ptr_w_idx); + output_buffers = LL_BUILDER.CreateBitCast( + output_buffers, + llvm::Type::getInt8PtrTy(LL_CONTEXT)->getPointerTo()->getPointerTo()); + auto partition_byte_streams = LL_BUILDER.CreateLoad( + llvm::Type::getInt8PtrTy(LL_CONTEXT)->getPointerTo(), + LL_BUILDER.CreateGEP(llvm::Type::getInt8PtrTy(LL_CONTEXT)->getPointerTo(), + output_buffers, + partition_key), + "partition_byte_streams"); + // Read the current partition offset from the offsets column. + CHECK(ra_exe_unit_.partition_offsets_col->is()); + auto offsets_col = code_generator.colByteStream( + ra_exe_unit_.partition_offsets_col->as(), true, false); + offsets_col = LL_BUILDER.CreateBitCast( + offsets_col, llvm::Type::getInt64PtrTy(LL_CONTEXT), "partition_offsets"); + auto offset_ptr = LL_BUILDER.CreateGEP(llvm::Type::getInt64Ty(LL_CONTEXT), + offsets_col, + partition_key, + "partition_offset_ptr"); + auto offset_val = LL_BUILDER.CreateLoad( + llvm::Type::getInt64Ty(LL_CONTEXT), offset_ptr, "partition_offset"); + TargetExprCodegenBuilder target_builder(ra_exe_unit_, true); + for (size_t target_idx = 0; target_idx < ra_exe_unit_.target_exprs.size(); + ++target_idx) { + auto target_expr = ra_exe_unit_.target_exprs[target_idx]; + auto target_byte_stream = + LL_BUILDER.CreateLoad(llvm::Type::getInt8PtrTy(LL_CONTEXT), + LL_BUILDER.CreateGEP(llvm::Type::getInt8PtrTy(LL_CONTEXT), + partition_byte_streams, + LL_INT(target_idx)), + "target_byte_stream_" + std::to_string(target_idx)); + size_t chosen_bytes = query_mem_desc.getPaddedSlotWidthBytes(target_idx); + + target_builder(target_expr, executor_, query_mem_desc, co); + target_builder.codegenSingleTarget(this, + executor_, + query_mem_desc, + co, + gpu_smem_context, + agg_out_ptr_w_idx, + {}, + target_byte_stream, + offset_val, + nullptr, + diamond_codegen, + target_idx); + } + // Increment offset value + auto new_offset_val = + LL_BUILDER.CreateAdd(offset_val, LL_INT(1LLU), "new_partition_offset"); + LL_BUILDER.CreateStore(new_offset_val, offset_ptr); + } + + return false; +} + /** * @brief: returns the pointer to where the aggregation should be stored. */ @@ -883,7 +1039,6 @@ llvm::Value* RowFuncBuilder::codegenAggColumnPtr( chosen_bytes == 8); CHECK(output_buffer_byte_stream); CHECK(out_row_idx); - size_t col_off = query_mem_desc.getColOffInBytes(agg_out_off); // multiplying by chosen_bytes, i.e., << log2(chosen_bytes) #ifdef _WIN32 auto out_per_col_byte_idx = @@ -892,7 +1047,16 @@ llvm::Value* RowFuncBuilder::codegenAggColumnPtr( auto out_per_col_byte_idx = LL_BUILDER.CreateShl(out_row_idx, __builtin_ffs(chosen_bytes) - 1); #endif - auto byte_offset = LL_BUILDER.CreateAdd(out_per_col_byte_idx, LL_INT(col_off)); + + llvm::Value* byte_offset; + // For shuffle we use multiple destinations with different column sizes, so + // we prepare column_pointer in advance instead of adding an offset here. + if (ra_exe_unit_.isShuffle()) { + byte_offset = out_per_col_byte_idx; + } else { + size_t col_off = query_mem_desc.getColOffInBytes(agg_out_off); + byte_offset = LL_BUILDER.CreateAdd(out_per_col_byte_idx, LL_INT(col_off)); + } byte_offset->setName("out_byte_off_target_" + std::to_string(target_idx)); auto output_ptr = LL_BUILDER.CreateGEP( output_buffer_byte_stream->getType()->getScalarType()->getPointerElementType(), @@ -1245,6 +1409,13 @@ std::vector RowFuncBuilder::codegenAggArg(const hdk::ir::Expr* tar target_expr, !executor_->plan_state_->allow_lazy_fetch_, co); } +llvm::Value* RowFuncBuilder::emitCall(llvm::IRBuilder<>& ir_builder, + const std::string& fname, + const std::vector& args) { + AUTOMATIC_IR_METADATA(executor_->cgen_state_.get()); + return executor_->cgen_state_->emitCall(ir_builder, fname, args); +} + llvm::Value* RowFuncBuilder::emitCall(const std::string& fname, const std::vector& args) { AUTOMATIC_IR_METADATA(executor_->cgen_state_.get()); diff --git a/omniscidb/QueryEngine/RowFuncBuilder.h b/omniscidb/QueryEngine/RowFuncBuilder.h index 2d9ba331a6..b279f1df36 100644 --- a/omniscidb/QueryEngine/RowFuncBuilder.h +++ b/omniscidb/QueryEngine/RowFuncBuilder.h @@ -60,6 +60,10 @@ class RowFuncBuilder { const QueryMemoryDescriptor& query_mem_desc, const CompilationOptions& co, DiamondCodegen& codegen); + std::tuple codegenPartitionKey( + const QueryMemoryDescriptor& query_mem_desc, + const CompilationOptions& co, + DiamondCodegen& codegen); llvm::Value* codegenVarlenOutputBuffer(const QueryMemoryDescriptor& query_mem_desc, const CompilationOptions& co); @@ -101,6 +105,11 @@ class RowFuncBuilder { const CompilationOptions& co, const GpuSharedMemoryContext& gpu_smem_context, DiamondCodegen& diamond_codegen); + bool codegenShuffle(const std::tuple& agg_out_ptr_w_idx, + QueryMemoryDescriptor& query_mem_desc, + const CompilationOptions& co, + const GpuSharedMemoryContext& gpu_smem_context, + DiamondCodegen& diamond_codegen); llvm::Value* codegenWindowRowPointer(const hdk::ir::WindowFunction* window_func, const QueryMemoryDescriptor& query_mem_desc, @@ -138,6 +147,9 @@ class RowFuncBuilder { std::vector codegenAggArg(const hdk::ir::Expr* target_expr, const CompilationOptions& co); + llvm::Value* emitCall(llvm::IRBuilder<>& ir_builder, + const std::string& fname, + const std::vector& args); llvm::Value* emitCall(const std::string& fname, const std::vector& args); void checkErrorCode(llvm::Value* retCode); diff --git a/omniscidb/QueryEngine/TargetExprBuilder.cpp b/omniscidb/QueryEngine/TargetExprBuilder.cpp index f58c34cb72..a0331a8013 100644 --- a/omniscidb/QueryEngine/TargetExprBuilder.cpp +++ b/omniscidb/QueryEngine/TargetExprBuilder.cpp @@ -628,6 +628,37 @@ void TargetExprCodegenBuilder::codegen( } } +void TargetExprCodegenBuilder::codegenSingleTarget( + RowFuncBuilder* row_func_builder, + Executor* executor, + const QueryMemoryDescriptor& query_mem_desc, + const CompilationOptions& co, + const GpuSharedMemoryContext& gpu_smem_context, + const std::tuple& agg_out_ptr_w_idx, + const std::vector& agg_out_vec, + llvm::Value* output_buffer_byte_stream, + llvm::Value* out_row_idx, + llvm::Value* varlen_output_buffer, + DiamondCodegen& diamond_codegen, + size_t target_idx) const { + CHECK(row_func_builder); + CHECK(executor); + AUTOMATIC_IR_METADATA(executor->cgen_state_.get()); + + CHECK_LT(target_idx, target_exprs_to_codegen.size()); + target_exprs_to_codegen[target_idx].codegen(row_func_builder, + executor, + query_mem_desc, + co, + gpu_smem_context, + agg_out_ptr_w_idx, + agg_out_vec, + output_buffer_byte_stream, + out_row_idx, + varlen_output_buffer, + diamond_codegen); +} + void TargetExprCodegenBuilder::codegenSampleExpressions( RowFuncBuilder* row_func_builder, Executor* executor, diff --git a/omniscidb/QueryEngine/TargetExprBuilder.h b/omniscidb/QueryEngine/TargetExprBuilder.h index b3824af9c8..98994891bc 100644 --- a/omniscidb/QueryEngine/TargetExprBuilder.h +++ b/omniscidb/QueryEngine/TargetExprBuilder.h @@ -96,6 +96,20 @@ struct TargetExprCodegenBuilder { llvm::Value* varlen_output_buffer, DiamondCodegen& diamond_codegen) const; + void codegenSingleTarget( + RowFuncBuilder* row_func_builder, + Executor* executor, + const QueryMemoryDescriptor& query_mem_desc, + const CompilationOptions& co, + const GpuSharedMemoryContext& gpu_smem_context, + const std::tuple& agg_out_ptr_w_idx, + const std::vector& agg_out_vec, + llvm::Value* output_buffer_byte_stream, + llvm::Value* out_row_idx, + llvm::Value* varlen_output_buffer, + DiamondCodegen& diamond_codegen, + size_t target_idx) const; + void codegenSampleExpressions( RowFuncBuilder* row_func_builder, Executor* executor, diff --git a/omniscidb/QueryEngine/Visitors/TemplateAggregationVisitor.h b/omniscidb/QueryEngine/Visitors/TemplateAggregationVisitor.h index 5081e3e401..9790f32c45 100644 --- a/omniscidb/QueryEngine/Visitors/TemplateAggregationVisitor.h +++ b/omniscidb/QueryEngine/Visitors/TemplateAggregationVisitor.h @@ -45,6 +45,10 @@ class TemplateAggregationVisitor : public RelAlgVisitor { return rejectNode(); } + virtual TemplateSample visitShuffle(const hdk::ir::Shuffle*) const { + return rejectNode(); + } + std::vector getTemplates() { std::vector ret(collected_templates_); collected_templates_.clear(); diff --git a/omniscidb/QueryEngine/WorkUnitBuilder.cpp b/omniscidb/QueryEngine/WorkUnitBuilder.cpp index b3df29d636..801f53aa2c 100644 --- a/omniscidb/QueryEngine/WorkUnitBuilder.cpp +++ b/omniscidb/QueryEngine/WorkUnitBuilder.cpp @@ -350,7 +350,10 @@ RelAlgExecutionUnit WorkUnitBuilder::exeUnit() const { query_plan_dag_, hash_table_build_plan_dag_, table_id_to_node_map_, - union_all_}; + union_all_, + shuffle_fn_, + partition_offsets_col_, + partitioned_aggregation_}; } void WorkUnitBuilder::build() { @@ -430,6 +433,8 @@ void WorkUnitBuilder::process(const ir::Node* node) { processUnion(node->as()); } else if (node->is()) { processJoin(node->as()); + } else if (node->is()) { + processShuffle(node->as()); } else { CHECK(false) << "Unsupported node: " + node->toString(); } @@ -465,6 +470,10 @@ void WorkUnitBuilder::processAggregate(const ir::Aggregate* agg) { } target_exprs_[rte_idx] = std::move(new_target_exprs); is_agg_ = true; + partitioned_aggregation_ = agg->isPartitioned(); + if (agg->bufferEntryCountHint()) { + max_groups_buffer_entry_guess_ = agg->bufferEntryCountHint(); + } } void WorkUnitBuilder::processProject(const ir::Project* proj) { @@ -635,6 +644,49 @@ void WorkUnitBuilder::processJoin(const ir::Join* join) { left_deep_tree_id_ = join->getId(); } +void WorkUnitBuilder::processShuffle(const ir::Shuffle* shuffle) { + RelAlgTranslator translator( + executor_, input_nest_levels_, join_types_, now_, eo_.just_explain); + auto rte_idx = all_nest_levels_.at(shuffle); + CHECK_EQ(rte_idx, 0); + + ir::ExprPtrVector new_target_exprs; + for (auto& expr : shuffle->exprs()) { + auto rewritten_expr = input_rewriter_.visit(expr.get()); + auto target_expr = translate(rewritten_expr.get(), translator, eo_.executor_type); + new_target_exprs.emplace_back(std::move(target_expr)); + } + + for (size_t i = 0; i < shuffle->size(); ++i) { + input_rewriter_.addReplacement(shuffle, i, new_target_exprs[i]); + } + target_exprs_[rte_idx] = std::move(new_target_exprs); + + if (!groupby_exprs_.empty()) { + CHECK_EQ(groupby_exprs_.size(), (size_t)1); + CHECK(groupby_exprs_.front() == nullptr); + groupby_exprs_.clear(); + } + + for (auto& expr : shuffle->keys()) { + auto rewritten_expr = input_rewriter_.visit(expr.get()); + auto key_expr = translate(rewritten_expr.get(), translator, eo_.executor_type); + groupby_exprs_.emplace_back(std::move(key_expr)); + } + + // If the node has two inputs then the first column of the second input node + // holds computed offsets + if (shuffle->inputCount() > 1) { + CHECK_EQ(shuffle->inputCount(), (size_t)2); + auto rewritten_expr = + input_rewriter_.visit(hdk::ir::getNodeColumnRef(shuffle->getInput(1), 0).get()); + partition_offsets_col_ = + translate(rewritten_expr.get(), translator, eo_.executor_type); + } + + shuffle_fn_ = shuffle->fn(); +} + std::list WorkUnitBuilder::makeJoinQuals( const hdk::ir::Expr* join_condition) { RelAlgTranslator translator( @@ -731,6 +783,17 @@ void WorkUnitBuilder::computeSimpleQuals() { quals_ = std::move(quals); } +void WorkUnitBuilder::assignUnionOrder(const ir::Node* node, int order) { + if (auto scan = node->as()) { + union_order_[{scan->getDatabaseId(), scan->getTableId()}] = order; + } else { + CHECK(node->getResult()); + auto token = node->getResult()->getToken(); + CHECK(token); + union_order_[{token->dbId(), token->tableId()}] = order; + } +} + int WorkUnitBuilder::assignNestLevels(const ir::Node* node, int start_idx) { all_nest_levels_.emplace(node, start_idx); @@ -743,16 +806,26 @@ int WorkUnitBuilder::assignNestLevels(const ir::Node* node, int start_idx) { for (size_t i = 0; i < node->inputCount(); ++i) { auto input_node = node->getInput(i); assignNestLevels(input_node, start_idx); - if (auto scan = input_node->as()) { - union_order_[{scan->getDatabaseId(), scan->getTableId()}] = i; + assignUnionOrder(input_node, static_cast(i)); + } + ++start_idx; + } else if (node->is()) { + for (size_t i = 0; i < node->inputCount(); ++i) { + auto input_node = node->getInput(i); + assignNestLevels(input_node, start_idx); + } + ++start_idx; + if (node->inputCount() > 1) { + // union_order_ is used to sort input descriptors. We want data + // table to be the first and partition sizes table be the second. + CHECK_EQ(input_nest_levels_.size(), (size_t)2); + if (input_nest_levels_.begin()->first == node->getInput(1)) { + assignUnionOrder((++input_nest_levels_.begin())->first, 0); } else { - CHECK(input_node->getResult()); - auto token = input_node->getResult()->getToken(); - CHECK(token); - union_order_[{token->dbId(), token->tableId()}] = i; + assignUnionOrder(input_nest_levels_.begin()->first, 0); } + assignUnionOrder(node->getInput(1), 1); } - ++start_idx; } else { for (size_t i = 0; i < node->inputCount(); ++i) { start_idx = assignNestLevels(node->getInput(i), start_idx); @@ -853,6 +926,10 @@ void WorkUnitBuilder::computeInputColDescs() { collector.visit(expr.get()); } + if (partition_offsets_col_) { + collector.visit(partition_offsets_col_.get()); + } + std::vector> col_descs; for (auto& col_var : collector.result()) { col_descs.push_back(std::make_shared(col_var.columnInfo(), @@ -870,7 +947,9 @@ void WorkUnitBuilder::computeInputColDescs() { if (tdesc.getTableRef() != processed_table_ref) { auto columns = schema_provider_->listColumns(tdesc.getTableRef()); for (auto& col_info : columns) { - col_descs.push_back(std::make_shared(col_info, 0)); + if (!col_info->is_rowid) { + col_descs.push_back(std::make_shared(col_info, 0)); + } } } } diff --git a/omniscidb/QueryEngine/WorkUnitBuilder.h b/omniscidb/QueryEngine/WorkUnitBuilder.h index e9245abc27..4223df54b4 100644 --- a/omniscidb/QueryEngine/WorkUnitBuilder.h +++ b/omniscidb/QueryEngine/WorkUnitBuilder.h @@ -63,9 +63,11 @@ class WorkUnitBuilder { void processSort(const ir::Sort* sort); void processUnion(const ir::LogicalUnion* logical_union); void processJoin(const ir::Join* join); + void processShuffle(const ir::Shuffle* shuffle); std::list makeJoinQuals(const hdk::ir::Expr* join_condition); void reorderTables(); void computeSimpleQuals(); + void assignUnionOrder(const ir::Node* node, int order); int assignNestLevels(const ir::Node* node, int start_idx = 0); void computeJoinTypes(const ir::Node* node, bool allow_join = true); void computeInputDescs(); @@ -133,6 +135,9 @@ class WorkUnitBuilder { HashTableBuildDagMap hash_table_build_plan_dag_; TableIdToNodeMap table_id_to_node_map_; std::optional union_all_; + std::optional shuffle_fn_; + hdk::ir::ExprPtr partition_offsets_col_; + bool partitioned_aggregation_ = false; std::optional left_deep_tree_id_; size_t max_groups_buffer_entry_guess_; diff --git a/omniscidb/Shared/Config.h b/omniscidb/Shared/Config.h index 0fb84c9389..f33fd1c285 100644 --- a/omniscidb/Shared/Config.h +++ b/omniscidb/Shared/Config.h @@ -51,6 +51,12 @@ struct GroupByConfig { size_t baseline_threshold = 1'000'000; int64_t large_ndv_threshold = 10'000'000; size_t large_ndv_multiplier = 256; + bool enable_cpu_partitioned_groupby = true; + size_t partitioning_buffer_size_threshold = 1 << 30; + double partitioning_group_size_threshold = 2.0; + size_t min_partitions = 0; + size_t max_partitions = 1024; + size_t partitioning_buffer_target_size = 32 << 20; }; struct WindowFunctionsConfig { @@ -101,6 +107,7 @@ struct ExecutionConfig { bool enable_interop = false; size_t parallel_linearization_threshold = 10'000; bool enable_multifrag_rs = true; + bool enable_multifrag_execution_result = false; size_t override_gpu_block_size = 0; size_t override_gpu_grid_size = 0; diff --git a/omniscidb/Shared/MathUtils.cpp b/omniscidb/Shared/MathUtils.cpp index 37af76ed66..3e1021dc56 100644 --- a/omniscidb/Shared/MathUtils.cpp +++ b/omniscidb/Shared/MathUtils.cpp @@ -14,20 +14,4 @@ * limitations under the License. */ -namespace shared { - -bool isPowOfTwo(unsigned n) { - return (n & (n - 1)) == 0; -} - -unsigned getExpOfTwo(unsigned n) { - unsigned i = 0; - - while ((n = n >> 1)) { - ++i; - } - - return i; -} - -} // namespace shared +namespace shared {} // namespace shared diff --git a/omniscidb/Shared/MathUtils.h b/omniscidb/Shared/MathUtils.h index 160e581b27..acb2fd0747 100644 --- a/omniscidb/Shared/MathUtils.h +++ b/omniscidb/Shared/MathUtils.h @@ -19,9 +19,32 @@ namespace shared { -bool isPowOfTwo(unsigned n); +template ::value>> +bool isPowOfTwo(T n) { + return (n & (n - 1)) == 0; +} -unsigned getExpOfTwo(unsigned n); +template ::value>> +unsigned getExpOfTwo(T n) { + unsigned i = 0; + + while ((n = n >> 1)) { + ++i; + } + + return i; +} + +template ::value>> +T roundUpToPow2(T val) { +#ifdef _MSC_VER + return val == static_cast(1) ? static_cast(1) + : static_cast(1) << (64 - __lzcnt64(val - 1)); +#else + return val == static_cast(1) ? static_cast(1) + : static_cast(1) << (64 - __builtin_clzl(val - 1)); +#endif +} } // namespace shared diff --git a/omniscidb/Tests/ArrowSQLRunner/ArrowSQLRunner.cpp b/omniscidb/Tests/ArrowSQLRunner/ArrowSQLRunner.cpp index 4537481009..6a3699b3f3 100644 --- a/omniscidb/Tests/ArrowSQLRunner/ArrowSQLRunner.cpp +++ b/omniscidb/Tests/ArrowSQLRunner/ArrowSQLRunner.cpp @@ -106,6 +106,21 @@ class ArrowSQLRunnerImpl { return res; } + ExecutionResult runQuery(std::unique_ptr dag, + ExecutorDeviceType device_type, + bool allow_loop_joins) { + auto ra_executor = + std::make_unique(executor_.get(), schema_mgr_, std::move(dag)); + auto eo = getExecutionOptions(allow_loop_joins); + auto co = getCompilationOptions(device_type); + ExecutionResult res; + + execution_time_ += measure::execution( + [&]() { res = ra_executor->executeRelAlgQuery(co, eo, false); }); + + return res; + } + ExecutionResult runSqlQuery(const std::string& sql, ExecutorDeviceType device_type, const ExecutionOptions& eo) { @@ -440,6 +455,13 @@ ExecutionResult runSqlQuery(const std::string& sql, return ArrowSQLRunnerImpl::get()->runSqlQuery(sql, device_type, allow_loop_joins); } +ExecutionResult runQuery(std::unique_ptr dag, + ExecutorDeviceType device_type, + bool allow_loop_joins) { + return ArrowSQLRunnerImpl::get()->runQuery( + std::move(dag), device_type, allow_loop_joins); +} + ExecutionOptions getExecutionOptions(bool allow_loop_joins, bool just_explain) { return ArrowSQLRunnerImpl::get()->getExecutionOptions(allow_loop_joins, just_explain); } diff --git a/omniscidb/Tests/ArrowSQLRunner/ArrowSQLRunner.h b/omniscidb/Tests/ArrowSQLRunner/ArrowSQLRunner.h index a7fb43a05d..d2c70af66a 100644 --- a/omniscidb/Tests/ArrowSQLRunner/ArrowSQLRunner.h +++ b/omniscidb/Tests/ArrowSQLRunner/ArrowSQLRunner.h @@ -74,6 +74,10 @@ ExecutionResult runSqlQuery(const std::string& sql, ExecutorDeviceType device_type, bool allow_loop_joins); +ExecutionResult runQuery(std::unique_ptr dag, + ExecutorDeviceType device_type = ExecutorDeviceType::CPU, + bool allow_loop_joins = false); + ExecutionOptions getExecutionOptions(bool allow_loop_joins, bool just_explain = false); CompilationOptions getCompilationOptions(ExecutorDeviceType device_type); diff --git a/omniscidb/Tests/CMakeLists.txt b/omniscidb/Tests/CMakeLists.txt index b5da188ef2..30ca53fcc3 100644 --- a/omniscidb/Tests/CMakeLists.txt +++ b/omniscidb/Tests/CMakeLists.txt @@ -47,6 +47,7 @@ add_executable(ArrowStorageSqlTest ArrowStorageSqlTest.cpp) add_executable(ResultSetArrowConversion ResultSetArrowConversion.cpp) add_executable(ExecutionSequenceTest ExecutionSequenceTest.cpp TestRelAlgDagBuilder.cpp) add_executable(QueryBuilderTest QueryBuilderTest.cpp TestRelAlgDagBuilder.cpp) +add_executable(PartitionedGroupByTest PartitionedGroupByTest.cpp) if(NOT ${CMAKE_SYSTEM_NAME} STREQUAL "Darwin") add_executable(UdfTest UdfTest.cpp) @@ -121,6 +122,7 @@ target_link_libraries(ParallelSortTest gtest TBB::tbb Logger) target_link_libraries(ResultSetArrowConversion gtest QueryEngine ArrowQueryRunner ArrowStorage ConfigBuilder) target_link_libraries(ExecutionSequenceTest gtest QueryEngine ArrowQueryRunner ArrowStorage ConfigBuilder) target_link_libraries(QueryBuilderTest gtest QueryBuilder QueryEngine ArrowQueryRunner IR ArrowStorage ConfigBuilder) +target_link_libraries(PartitionedGroupByTest gtest QueryEngine ArrowQueryRunner IR ArrowStorage ConfigBuilder) if(NOT ${CMAKE_SYSTEM_NAME} STREQUAL "Darwin") target_link_libraries(UdfTest gtest UdfCompiler QueryEngine ArrowQueryRunner) @@ -167,6 +169,7 @@ add_test(ParallelSortTest ParallelSortTest ${TEST_ARGS}) add_test(ResultSetArrowConversion ResultSetArrowConversion ${TEST_ARGS}) add_test(ExecutionSequenceTest ExecutionSequenceTest ${TEST_ARGS}) add_test(QueryBuilderTest QueryBuilderTest ${TEST_ARGS}) +add_test(PartitionedGroupByTest PartitionedGroupByTest ${TEST_ARGS}) add_test(NAME ArrowBasedExecuteTestColumnarOutputCpuOnly COMMAND ArrowBasedExecuteTest ${TEST_ARGS} "--enable-columnar-output") set_tests_properties(ArrowBasedExecuteTestColumnarOutputCpuOnly PROPERTIES LABELS "cpu_only") @@ -226,6 +229,7 @@ set(TEST_PROGRAMS ResultSetArrowConversion ExecutionSequenceTest QueryBuilderTest + PartitionedGroupByTest ) if(ENABLE_CUDA) diff --git a/omniscidb/Tests/PartitionedGroupByTest.cpp b/omniscidb/Tests/PartitionedGroupByTest.cpp new file mode 100644 index 0000000000..a5efa6db7a --- /dev/null +++ b/omniscidb/Tests/PartitionedGroupByTest.cpp @@ -0,0 +1,177 @@ +/** + * Copyright (C) 2023 Intel Corporation + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "ArrowTestHelpers.h" +#include "TestHelpers.h" + +#include "ArrowSQLRunner/ArrowSQLRunner.h" +#include "ConfigBuilder/ConfigBuilder.h" +#include "QueryBuilder/QueryBuilder.h" +#include "Shared/scope.h" + +using namespace std::string_literals; +using namespace ArrowTestHelpers; +using namespace TestHelpers::ArrowSQLRunner; +using namespace hdk; +using namespace hdk::ir; + +EXTERN extern bool g_enable_table_functions; + +class PartitionedGroupByTest : public ::testing::Test { + protected: + static constexpr int TEST_SCHEMA_ID2 = 2; + static constexpr int TEST_DB_ID2 = (TEST_SCHEMA_ID2 << 24) + 1; + static constexpr size_t row_count = 20; + static std::vector id1_vals; + static std::vector id2_vals; + static std::vector id3_vals; + static std::vector id4_vals; + static std::vector v1_vals; + static std::vector v2_vals; + static std::vector v1_sums; + static std::vector v2_sums; + + static void SetUpTestSuite() { + createTable("test1", + {{"id1", ctx().int64()}, + {"id2", ctx().int32()}, + {"id3", ctx().int16()}, + {"id4", ctx().extDict(ctx().text(), 0)}, + {"v1", ctx().int32()}, + {"v2", ctx().int32()}}, + {row_count / 5}); + std::stringstream ss; + for (size_t i = 1; i <= row_count; ++i) { + auto val = i == row_count ? 1000000000000 : i; // to avoid perfect hash + id1_vals.push_back(val); + id2_vals.push_back(i * 10); + id3_vals.push_back(i * 100); + id4_vals.push_back("str"s + ::std::to_string(i)); + v1_vals.push_back(i * 3); + v2_vals.push_back(i * 111); + v1_sums.push_back(i * 3); + v2_sums.push_back(i * 111); + ss << id1_vals.back() << "," << id2_vals.back() << "," << id3_vals.back() << "," + << id4_vals.back() << "," << v1_vals.back() << "," << v2_vals.back() + << std::endl; + } + insertCsvValues("test1", ss.str()); + } + + static void TearDownTestSuite() { dropTable("test1"); } +}; + +std::vector PartitionedGroupByTest::id1_vals; +std::vector PartitionedGroupByTest::id2_vals; +std::vector PartitionedGroupByTest::id3_vals; +std::vector PartitionedGroupByTest::id4_vals; +std::vector PartitionedGroupByTest::v1_vals; +std::vector PartitionedGroupByTest::v2_vals; +std::vector PartitionedGroupByTest::v1_sums; +std::vector PartitionedGroupByTest::v2_sums; + +TEST_F(PartitionedGroupByTest, SingleKey) { + auto old_exec_groupby = config().exec.group_by; + ScopeGuard g([&old_exec_groupby]() { config().exec.group_by = old_exec_groupby; }); + + config().exec.group_by.default_max_groups_buffer_entry_guess = 1; + config().exec.group_by.big_group_threshold = 1; + config().exec.group_by.enable_cpu_partitioned_groupby = true; + config().exec.group_by.partitioning_buffer_size_threshold = 10; + config().exec.group_by.partitioning_group_size_threshold = 1.5; + config().exec.group_by.min_partitions = 2; + config().exec.group_by.max_partitions = 8; + config().exec.group_by.partitioning_buffer_target_size = 200; + config().exec.enable_multifrag_execution_result = true; + + QueryBuilder builder(ctx(), getSchemaProvider(), configPtr()); + auto scan = builder.scan("test1"); + auto dag1 = scan.agg({"id1"s}, {"sum(v1)"s}).finalize(); + auto res1 = runQuery(std::move(dag1)); + // Check the result has 4 fragments (partitions). + ASSERT_EQ(res1.getToken()->resultSetCount(), (size_t)4); + auto dag2 = builder.scan(res1.tableName()).sort({0}).finalize(); + auto res2 = runQuery(std::move(dag2)); + compare_res_data(res2, id1_vals, v1_sums); +} + +TEST_F(PartitionedGroupByTest, MultipleKeys) { + auto old_exec = config().exec; + ScopeGuard g([&old_exec]() { config().exec = old_exec; }); + + config().exec.group_by.default_max_groups_buffer_entry_guess = 1; + config().exec.group_by.big_group_threshold = 1; + config().exec.group_by.enable_cpu_partitioned_groupby = true; + config().exec.group_by.partitioning_buffer_size_threshold = 10; + config().exec.group_by.partitioning_group_size_threshold = 1.5; + config().exec.group_by.min_partitions = 2; + config().exec.group_by.max_partitions = 8; + config().exec.group_by.partitioning_buffer_target_size = 612; + config().exec.enable_multifrag_execution_result = true; + + QueryBuilder builder(ctx(), getSchemaProvider(), configPtr()); + auto scan = builder.scan("test1"); + auto dag1 = + scan.agg({"id1"s, "id2"s, "id3"s, "id4"s}, {"sum(v1)"s, "sum(v2)"s}).finalize(); + auto res1 = runQuery(std::move(dag1)); + // Check the result has 4 fragments (partitions). + ASSERT_EQ(res1.getToken()->resultSetCount(), (size_t)4); + auto dag2 = builder.scan(res1.tableName()).sort({0, 1, 2, 3}).finalize(); + auto res2 = runQuery(std::move(dag2)); + compare_res_data(res2, id1_vals, id2_vals, id3_vals, id4_vals, v1_sums, v2_sums); +} + +TEST_F(PartitionedGroupByTest, ReorderedKeys) { + auto old_exec = config().exec; + ScopeGuard g([&old_exec]() { config().exec = old_exec; }); + + config().exec.group_by.default_max_groups_buffer_entry_guess = 1; + config().exec.group_by.big_group_threshold = 1; + config().exec.group_by.enable_cpu_partitioned_groupby = true; + config().exec.group_by.partitioning_buffer_size_threshold = 10; + config().exec.group_by.partitioning_group_size_threshold = 1.5; + config().exec.group_by.min_partitions = 2; + config().exec.group_by.max_partitions = 8; + config().exec.group_by.partitioning_buffer_target_size = 612; + config().exec.enable_multifrag_execution_result = true; + + QueryBuilder builder(ctx(), getSchemaProvider(), configPtr()); + auto scan = builder.scan("test1"); + auto dag1 = + scan.agg({"id4"s, "id2"s, "id1"s, "id3"s}, {"sum(v1)"s, "sum(v2)"s}).finalize(); + auto res1 = runQuery(std::move(dag1)); + // Check the result has 4 fragments (partitions). + ASSERT_EQ(res1.getToken()->resultSetCount(), (size_t)4); + auto dag2 = builder.scan(res1.tableName()).sort({2}).finalize(); + auto res2 = runQuery(std::move(dag2)); + compare_res_data(res2, id4_vals, id2_vals, id1_vals, id3_vals, v1_sums, v2_sums); +} + +int main(int argc, char* argv[]) { + TestHelpers::init_logger_stderr_only(argc, argv); + testing::InitGoogleTest(&argc, argv); + + ConfigBuilder builder; + builder.parseCommandLineArgs(argc, argv, true); + auto config = builder.config(); + + // Avoid Calcite initialization for this suite. + config->debug.use_ra_cache = "dummy"; + // Enable table function. Must be done before init. + g_enable_table_functions = true; + + int err{0}; + try { + init(config); + err = RUN_ALL_TESTS(); + reset(); + } catch (const std::exception& e) { + LOG(ERROR) << e.what(); + return -1; + } + + return err; +}