diff --git a/velox/core/PlanNode.cpp b/velox/core/PlanNode.cpp index d4e94654f61..25ab4b64998 100644 --- a/velox/core/PlanNode.cpp +++ b/velox/core/PlanNode.cpp @@ -2228,8 +2228,41 @@ PlanNodePtr RowNumberNode::create(const folly::dynamic& obj, void* context) { source); } +namespace { +std::unordered_map +rankFunctionNames() { + return { + {TopNRowNumberNode::RankFunction::kRowNumber, "row_number"}, + {TopNRowNumberNode::RankFunction::kRank, "rank"}, + {TopNRowNumberNode::RankFunction::kDenseRank, "dense_rank"}, + }; +} +} // namespace + +// static +const char* TopNRowNumberNode::rankFunctionName( + TopNRowNumberNode::RankFunction function) { + static const auto kFunctionNames = rankFunctionNames(); + auto it = kFunctionNames.find(function); + VELOX_CHECK( + it != kFunctionNames.end(), + "Invalid window type {}", + static_cast(function)); + return it->second.c_str(); +} + +// static +TopNRowNumberNode::RankFunction TopNRowNumberNode::rankFunctionFromName( + const std::string& name) { + static const auto kFunctionNames = invertMap(rankFunctionNames()); + auto it = kFunctionNames.find(name); + VELOX_CHECK(it != kFunctionNames.end(), "Invalid rank function " + name); + return it->second; +} + TopNRowNumberNode::TopNRowNumberNode( PlanNodeId id, + RankFunction function, std::vector partitionKeys, std::vector sortingKeys, std::vector sortingOrders, @@ -2237,6 +2270,7 @@ TopNRowNumberNode::TopNRowNumberNode( int32_t limit, PlanNodePtr source) : PlanNode(std::move(id)), + function_(function), partitionKeys_{std::move(partitionKeys)}, sortingKeys_{std::move(sortingKeys)}, sortingOrders_{std::move(sortingOrders)}, @@ -2274,6 +2308,8 @@ TopNRowNumberNode::TopNRowNumberNode( } void TopNRowNumberNode::addDetails(std::stringstream& stream) const { + stream << rankFunctionName(function_) << " "; + if (!partitionKeys_.empty()) { stream << "partition by ("; addFields(stream, partitionKeys_); @@ -2289,6 +2325,7 @@ void TopNRowNumberNode::addDetails(std::stringstream& stream) const { folly::dynamic TopNRowNumberNode::serialize() const { auto obj = PlanNode::serialize(); + obj["function"] = rankFunctionName(function_); obj["partitionKeys"] = ISerializable::serialize(partitionKeys_); obj["sortingKeys"] = ISerializable::serialize(sortingKeys_); obj["sortingOrders"] = serializeSortingOrders(sortingOrders_); @@ -2310,6 +2347,7 @@ PlanNodePtr TopNRowNumberNode::create( const folly::dynamic& obj, void* context) { auto source = deserializeSingleSource(obj, context); + auto function = rankFunctionFromName(obj["function"].asString()); auto partitionKeys = deserializeFields(obj["partitionKeys"], context); auto sortingKeys = deserializeFields(obj["sortingKeys"], context); @@ -2322,6 +2360,7 @@ PlanNodePtr TopNRowNumberNode::create( return std::make_shared( deserializePlanNodeId(obj), + function, partitionKeys, sortingKeys, sortingOrders, diff --git a/velox/core/PlanNode.h b/velox/core/PlanNode.h index 5c2d071c831..665f6d1ce67 100644 --- a/velox/core/PlanNode.h +++ b/velox/core/PlanNode.h @@ -4733,24 +4733,36 @@ class MarkDistinctNode : public PlanNode { const RowTypePtr outputType_; }; -/// Optimized version of a WindowNode for a single row_number function with a -/// limit over sorted partitions. -/// The output of this node contains all input columns followed by an optional +/// Optimized version of a WindowNode for a single row_number, rank or +/// dense_rank function with a limit over sorted partitions. The output of this +/// node contains all input columns followed by an optional /// 'rowNumberColumnName' BIGINT column. class TopNRowNumberNode : public PlanNode { public: + enum class RankFunction { + kRowNumber, + kRank, + kDenseRank, + }; + + static const char* rankFunctionName(TopNRowNumberNode::RankFunction function); + + static RankFunction rankFunctionFromName(const std::string& name); + + /// @param rankFunction RanksFunction (row_number, rank, dense_rank) for TopN. /// @param partitionKeys Partitioning keys. May be empty. /// @param sortingKeys Sorting keys. May not be empty and may not intersect /// with 'partitionKeys'. /// @param sortingOrders Sorting orders, one per sorting key. /// @param rowNumberColumnName Optional name of the column containing row - /// numbers. If not specified, the output doesn't include 'row number' - /// column. This is used when computing partial results. + /// numbers (or rank and dense_rank). If not specified, the output doesn't + /// include 'row number' column. This is used when computing partial results. /// @param limit Per-partition limit. The number of /// rows produced by this node will not exceed this value for any given /// partition. Extra rows will be dropped. TopNRowNumberNode( PlanNodeId id, + RankFunction function, std::vector partitionKeys, std::vector sortingKeys, std::vector sortingOrders, @@ -4758,6 +4770,36 @@ class TopNRowNumberNode : public PlanNode { int32_t limit, PlanNodePtr source); + /// Note : This constructor is for backwards compatibility. Remove it after + /// migrating Prestissimo to use the new constructor. + /// @param partitionKeys Partitioning keys. May be empty. + /// @param sortingKeys Sorting keys. May not be empty and may not intersect + /// with 'partitionKeys'. + /// @param sortingOrders Sorting orders, one per sorting key. + /// @param rowNumberColumnName Optional name of the column containing row + /// numbers (or rank and dense_rank). If not specified, the output doesn't + /// include 'row number' column. This is used when computing partial results. + /// @param limit Per-partition limit. The number of + /// rows produced by this node will not exceed this value for any given + /// partition. Extra rows will be dropped. + TopNRowNumberNode( + PlanNodeId id, + std::vector partitionKeys, + std::vector sortingKeys, + std::vector sortingOrders, + const std::optional& rowNumberColumnName, + int32_t limit, + PlanNodePtr source) + : TopNRowNumberNode( + id, + RankFunction::kRowNumber, + partitionKeys, + sortingKeys, + sortingOrders, + rowNumberColumnName, + limit, + source) {} + class Builder { public: Builder() = default; @@ -4884,6 +4926,10 @@ class TopNRowNumberNode : public PlanNode { return limit_; } + RankFunction rankFunction() const { + return function_; + } + bool generateRowNumber() const { return outputType_->size() > sources_[0]->outputType()->size(); } @@ -4899,6 +4945,8 @@ class TopNRowNumberNode : public PlanNode { private: void addDetails(std::stringstream& stream) const override; + const RankFunction function_; + const std::vector partitionKeys_; const std::vector sortingKeys_; diff --git a/velox/exec/TopNRowNumber.cpp b/velox/exec/TopNRowNumber.cpp index 61cda574177..bb50e39583d 100644 --- a/velox/exec/TopNRowNumber.cpp +++ b/velox/exec/TopNRowNumber.cpp @@ -19,6 +19,28 @@ namespace facebook::velox::exec { namespace { +#define RANK_FUNCTION_DISPATCH(TEMPLATE_FUNC, functionKind, ...) \ + [&]() { \ + switch (functionKind) { \ + case core::TopNRowNumberNode::RankFunction::kRowNumber: { \ + return TEMPLATE_FUNC< \ + core::TopNRowNumberNode::RankFunction::kRowNumber>(__VA_ARGS__); \ + } \ + case core::TopNRowNumberNode::RankFunction::kRank: { \ + return TEMPLATE_FUNC( \ + __VA_ARGS__); \ + } \ + case core::TopNRowNumberNode::RankFunction::kDenseRank: { \ + return TEMPLATE_FUNC< \ + core::TopNRowNumberNode::RankFunction::kDenseRank>(__VA_ARGS__); \ + } \ + default: \ + VELOX_FAIL( \ + "not a rank function kind: {}", \ + core::TopNRowNumberNode::rankFunctionName(functionKind)); \ + } \ + }() + std::vector reorderInputChannels( const RowTypePtr& inputType, const std::vector& partitionKeys, @@ -113,9 +135,11 @@ TopNRowNumber::TopNRowNumber( node->canSpill(driverCtx->queryConfig()) ? driverCtx->makeSpillConfig(operatorId) : std::nullopt), + rankFunction_(node->rankFunction()), limit_{node->limit()}, generateRowNumber_{node->generateRowNumber()}, numPartitionKeys_{node->partitionKeys().size()}, + numSortingKeys_{node->sortingKeys().size()}, inputChannels_{reorderInputChannels( node->inputType(), node->partitionKeys(), @@ -208,8 +232,12 @@ void TopNRowNumber::addInput(RowVectorPtr input) { // Process input rows. For each row, lookup the partition. If the highest // (top) rank in that partition is less than limit, add the new row. // Otherwise, check if row should replace an existing row or be discarded. - processInputRowLoop(numInput); + RANK_FUNCTION_DISPATCH(processInputRowLoop, rankFunction_, numInput); + // It is determined that the TopNRowNumber (as a partial) is not rejecting + // enough input rows to make the duplicate detection worthwhile. Hence, + // abandon the processing at this partial TopN and let the final TopN do + // the processing. if (abandonPartialEarly()) { abandonedPartial_ = true; addRuntimeStat("abandonedPartial", RuntimeCounter(1)); @@ -219,7 +247,7 @@ void TopNRowNumber::addInput(RowVectorPtr input) { outputRows_.resize(outputBatchSize_); } } else { - processInputRowLoop(numInput); + RANK_FUNCTION_DISPATCH(processInputRowLoop, rankFunction_, numInput); } } @@ -244,7 +272,51 @@ void TopNRowNumber::initializeNewPartitions() { } } -char* TopNRowNumber::processRowWithinLimit( +template <> +char* TopNRowNumber::processRowWithinLimit< + core::TopNRowNumberNode::RankFunction::kRank>( + vector_size_t index, + TopRows& partition) { + // The topRanks queue is not filled yet. + auto& topRows = partition.rows; + if (topRows.empty()) { + partition.topRank = 1; + } else { + // rank assigns all peer rows the same rank, but the rank increments by + // the number of peers when moving between peers. So when adding a new + // row: + // If row == top rank then top rank is unchanged. + // If row < top rank then top rank += 1. + // If row > top, then rank += number of peers of top rank. + auto topRow = topRows.top(); + auto result = comparator_.compare(decodedVectors_, index, topRow); + if (result < 0) { + partition.topRank += 1; + } else if (result > 0) { + partition.topRank += partition.numTopRankRows(); + } + } + return data_->newRow(); +} + +template <> +char* TopNRowNumber::processRowWithinLimit< + core::TopNRowNumberNode::RankFunction::kDenseRank>( + vector_size_t index, + TopRows& partition) { + // The topRanks queue is not filled yet. + // dense_rank will add this row to its partition. But the top rank is + // incremented only if the new row is not a peer of any other existing + // row in the partition queue. + if (!partition.isDuplicate(decodedVectors_, index)) { + partition.topRank++; + } + return data_->newRow(); +} + +template <> +char* TopNRowNumber::processRowWithinLimit< + core::TopNRowNumberNode::RankFunction::kRowNumber>( vector_size_t /*index*/, TopRows& partition) { // row_number accumulates the new row in the partition, and the top rank is @@ -253,7 +325,62 @@ char* TopNRowNumber::processRowWithinLimit( return data_->newRow(); } -char* TopNRowNumber::processRowExceedingLimit( +template <> +char* TopNRowNumber::processRowExceedingLimit< + core::TopNRowNumberNode::RankFunction::kRank>( + vector_size_t index, + TopRows& partition) { + auto& topRows = partition.rows; + // The new row < top rank + // For rank, the new row gets assigned its rank as per its position in the + // queue. But the ranks of all subsequent rows increment by 1. + // So we can remove the rows at the top rank as its rank > limit now. + char* topRow = partition.removeTopRankRows(); + char* newRow = data_->initializeRow(topRow, true /* reuse */); + if (topRows.empty()) { + partition.topRank = 1; + } else { + // The new top rank value depends on the number of peers of the top ranking + // row. If the current row also has the same value as the new top ranking + // row then it has to be counted as a peer as well. + auto numNewTopRankRows = partition.numTopRankRows(); + topRow = topRows.top(); + if (comparator_.compare(decodedVectors_, index, topRow) == 0) { + partition.topRank = topRows.size() - numNewTopRankRows + 1; + } else { + partition.topRank = topRows.size() - numNewTopRankRows + 2; + } + } + return newRow; +} + +template <> +char* TopNRowNumber::processRowExceedingLimit< + core::TopNRowNumberNode::RankFunction::kDenseRank>( + vector_size_t index, + TopRows& partition) { + char* newRow = nullptr; + // The new row < top rank + // For dense_rank: + // i) If the row is a peer of an existing row in the queue, then it + // has the same rank as it. The ranks of other rows are unchanged. So its + // only added to the queue. + // ii) If the row is a distinct new value in the queue, then it is assigned + // a rank as per its position, and the ranks of all subsequent rows +=1. + // So the current top rank rows can be removed from the queue as their new + // rank > limit. + if (partition.isDuplicate(decodedVectors_, index)) { + newRow = data_->newRow(); + } else { + char* topRow = partition.removeTopRankRows(); + newRow = data_->initializeRow(topRow, true /* reuse */); + } + return newRow; +} + +template <> +char* TopNRowNumber::processRowExceedingLimit< + core::TopNRowNumberNode::RankFunction::kRowNumber>( vector_size_t /*index*/, TopRows& partition) { // The new row has rank < highest (aka top) rank at 'limit' function value. @@ -266,13 +393,16 @@ char* TopNRowNumber::processRowExceedingLimit( return data_->initializeRow(topRow, true /* reuse */); } +template void TopNRowNumber::processInputRow(vector_size_t index, TopRows& partition) { auto& topRows = partition.rows; char* newRow = nullptr; if (partition.topRank < limit_) { - newRow = processRowWithinLimit(index, partition); + newRow = processRowWithinLimit(index, partition); } else { + // The partition has now accumulated >= limit rows. So the new rows can be + // rejected or replace existing rows based on the order_by values. char* topRow = topRows.top(); const auto result = comparator_.compare(decodedVectors_, index, topRow); @@ -281,14 +411,18 @@ void TopNRowNumber::processInputRow(vector_size_t index, TopRows& partition) { return; } + // This row has the same value as the top rank row. row_number rejects + // such rows, but are added to the queue for rank and dense_rank. The top + // rank remains unchanged. if (result == 0) { - // The new row has the same value as the top rank row. row_number rejects - // such rows. - return; + if (rankFunction_ == core::TopNRowNumberNode::RankFunction::kRowNumber) { + return; + } + newRow = data_->newRow(); } if (result < 0) { - newRow = processRowExceedingLimit(index, partition); + newRow = processRowExceedingLimit(index, partition); } } @@ -299,14 +433,15 @@ void TopNRowNumber::processInputRow(vector_size_t index, TopRows& partition) { topRows.push(newRow); } +template void TopNRowNumber::processInputRowLoop(vector_size_t numInput) { if (table_) { for (auto i = 0; i < numInput; ++i) { - processInputRow(i, partitionAt(lookup_->hits[i])); + processInputRow(i, partitionAt(lookup_->hits[i])); } } else { for (auto i = 0; i < numInput; ++i) { - processInputRow(i, *singlePartition_); + processInputRow(i, *singlePartition_); } } } @@ -352,10 +487,28 @@ void TopNRowNumber::updateEstimatedOutputRowSize() { } } +vector_size_t TopNRowNumber::fixTopRank(TopRows& partition) { + if (rankFunction_ == core::TopNRowNumberNode::RankFunction::kRank) { + if (partition.topRank > limit_) { + partition.removeTopRankRows(); + auto numNewTopRankRows = partition.numTopRankRows(); + partition.topRank = partition.rows.size() - numNewTopRankRows + 1; + } + } + + return partition.topRank; +} + TopNRowNumber::TopRows* TopNRowNumber::nextPartition() { + auto setNextRankAndPeer = [&](TopRows& partition) { + nextRank_ = fixTopRank(partition); + numPeers_ = 1; + }; + if (!table_) { if (!outputPartitionNumber_) { outputPartitionNumber_ = 0; + setNextRankAndPeer(*singlePartition_); return singlePartition_.get(); } return nullptr; @@ -371,7 +524,6 @@ TopNRowNumber::TopRows* TopNRowNumber::nextPartition() { // No more partitions. return nullptr; } - outputPartitionNumber_ = 0; } else { ++outputPartitionNumber_.value(); @@ -381,24 +533,57 @@ TopNRowNumber::TopRows* TopNRowNumber::nextPartition() { } } - return &partitionAt(partitions_[outputPartitionNumber_.value()]); + auto partition = &partitionAt(partitions_[outputPartitionNumber_.value()]); + setNextRankAndPeer(*partition); + return partition; } +template +void TopNRowNumber::computeNextRankInMemory( + const TopRows& partition, + vector_size_t outputIndex) { + if constexpr (TRank == core::TopNRowNumberNode::RankFunction::kRowNumber) { + nextRank_ -= 1; + return; + } + + // This is the logic for rank() and dense_rank(). + // If the next row is a peer of the current one, then the rank remains the + // same, but the number of peers is incremented. + if (comparator_.compare(outputRows_[outputIndex], partition.rows.top()) == + 0) { + numPeers_ += 1; + return; + } + + // The new row is not a peer of the current one. So dense_rank drops the + // rank by 1, but rank drops it by the number of peers (which is then reset). + if constexpr (TRank == core::TopNRowNumberNode::RankFunction::kDenseRank) { + nextRank_ -= 1; + } else { + nextRank_ -= numPeers_; + numPeers_ = 1; + } +} + +template void TopNRowNumber::appendPartitionRows( TopRows& partition, vector_size_t numRows, vector_size_t outputOffset, - FlatVector* rowNumbers) { + FlatVector* rankValues) { // The partition.rows priority queue pops rows in order of reverse - // row numbers. - auto rowNumber = partition.rows.size(); + // ranks. Output rows based on nextRank_ and update it with each row. for (auto i = 0; i < numRows; ++i) { - const auto index = outputOffset + i; - if (rowNumbers) { - rowNumbers->set(index, rowNumber--); + auto index = outputOffset + i; + if (rankValues) { + rankValues->set(index, nextRank_); } outputRows_[index] = partition.rows.top(); partition.rows.pop(); + if (!partition.rows.empty()) { + computeNextRankInMemory(partition, index); + } } } @@ -414,7 +599,7 @@ RowVectorPtr TopNRowNumber::getOutput() { return output; } - // We may have input accumulated in 'data_'. + // There could be older rows accumulated in 'data_'. if (data_->numRows() > 0) { return getOutputFromMemory(); } @@ -422,7 +607,7 @@ RowVectorPtr TopNRowNumber::getOutput() { if (noMoreInput_) { finished_ = true; } - + // There is no data to return at this moment. return nullptr; } @@ -430,9 +615,11 @@ RowVectorPtr TopNRowNumber::getOutput() { return nullptr; } + // All the input data is received, so the operator can start producing + // output. RowVectorPtr output; if (merge_ != nullptr) { - output = getOutputFromSpill(); + output = RANK_FUNCTION_DISPATCH(getOutputFromSpill, rankFunction_); } else { output = getOutputFromMemory(); } @@ -467,22 +654,30 @@ RowVectorPtr TopNRowNumber::getOutputFromMemory() { } } - const auto numOutputRowsLeft = outputBatchSize_ - offset; + auto numOutputRowsLeft = outputBatchSize_ - offset; if (outputPartition_->rows.size() > numOutputRowsLeft) { // Only a partial partition can be output in this getOutput() call. // Output as many rows as possible. - // NOTE: the partial output partition erases the yielded output rows - // and next getOutput() call starts with the remaining rows. - appendPartitionRows( - *outputPartition_, numOutputRowsLeft, offset, rowNumbers); + RANK_FUNCTION_DISPATCH( + appendPartitionRows, + rankFunction_, + *outputPartition_, + numOutputRowsLeft, + offset, + rowNumbers); offset += numOutputRowsLeft; break; } // Add all partition rows. auto numPartitionRows = outputPartition_->rows.size(); - appendPartitionRows( - *outputPartition_, numPartitionRows, offset, rowNumbers); + RANK_FUNCTION_DISPATCH( + appendPartitionRows, + rankFunction_, + *outputPartition_, + numPartitionRows, + offset, + rowNumbers); offset += numPartitionRows; outputPartition_ = nullptr; } @@ -509,13 +704,15 @@ RowVectorPtr TopNRowNumber::getOutputFromMemory() { return output; } -bool TopNRowNumber::isNewPartition( +bool TopNRowNumber::compareSpillRowColumns( const RowVectorPtr& output, vector_size_t index, - SpillMergeStream* next) { + const SpillMergeStream* next, + vector_size_t startColumn, + vector_size_t endColumn) { VELOX_CHECK_GT(index, 0); - for (auto i = 0; i < numPartitionKeys_; ++i) { + for (auto i = startColumn; i < endColumn; ++i) { if (!output->childAt(inputChannels_[i]) ->equalValueAt( next->current().childAt(i).get(), @@ -527,22 +724,80 @@ bool TopNRowNumber::isNewPartition( return false; } -void TopNRowNumber::setupNextOutput( +// Compares the partition keys for new partitions. +bool TopNRowNumber::isNewPartition( const RowVectorPtr& output, - int32_t rowNumber) { - auto* lookAhead = merge_->next(); - if (lookAhead == nullptr) { - nextRowNumber_ = 0; + vector_size_t index, + const SpillMergeStream* next) { + return compareSpillRowColumns(output, index, next, 0, numPartitionKeys_); +} + +// Compares the sorting keys for determining peers. +bool TopNRowNumber::isNewPeer( + const RowVectorPtr& output, + vector_size_t index, + const SpillMergeStream* next) { + return compareSpillRowColumns( + output, + index, + next, + numPartitionKeys_, + numPartitionKeys_ + numSortingKeys_); +} + +template +void TopNRowNumber::computeNextRankInSpill( + const RowVectorPtr& output, + vector_size_t index, + const SpillMergeStream* next) { + if (isNewPartition(output, index, next)) { + nextRank_ = 1; + numPeers_ = 1; + return; + } + + // The rank always += 1 for row_number. + if constexpr (TRank == core::TopNRowNumberNode::RankFunction::kRowNumber) { + nextRank_ += 1; + return; + } + + // The function is either rank or dense_rank. + // This row belongs to the same partition as the previous row. However, + // it should be determined if it is a peer row as well. If its a peer, + // then increase numPeers_ but the rank remains unchanged. + if (!isNewPeer(output, index, next)) { + numPeers_ += 1; + return; + } + + // The row is not a peer, so increment the rank and peers accordingly. + if constexpr (TRank == core::TopNRowNumberNode::RankFunction::kDenseRank) { + nextRank_ += 1; + numPeers_ = 1; return; } - if (isNewPartition(output, output->size(), lookAhead)) { - nextRowNumber_ = 0; + // Rank function increments by number of peers. + nextRank_ += numPeers_; + numPeers_ = 1; +} + +template +void TopNRowNumber::setupNextOutput(const RowVectorPtr& output) { + auto resetNextRankAndPeer = [this]() { + nextRank_ = 1; + numPeers_ = 1; + }; + + auto* lookAhead = merge_->next(); + if (lookAhead == nullptr) { + resetNextRankAndPeer(); return; } - nextRowNumber_ = rowNumber; - if (nextRowNumber_ < limit_) { + computeNextRankInSpill(output, output->size(), lookAhead); + if (nextRank_ <= limit_) { return; } @@ -550,16 +805,17 @@ void TopNRowNumber::setupNextOutput( lookAhead->pop(); while (auto* next = merge_->next()) { if (isNewPartition(output, output->size(), next)) { - nextRowNumber_ = 0; + resetNextRankAndPeer(); return; } next->pop(); } // This partition is the last partition. - nextRowNumber_ = 0; + resetNextRankAndPeer(); } +template RowVectorPtr TopNRowNumber::getOutputFromSpill() { VELOX_CHECK_NOT_NULL(merge_); @@ -567,37 +823,33 @@ RowVectorPtr TopNRowNumber::getOutputFromSpill() { // All rows from the same partition will appear together. // We'll identify partition boundaries by comparing partition keys of the // current row with the previous row. When new partition starts, we'll reset - // row number to zero. Once row number reaches the 'limit_', we'll start + // nextRank_ and numPeers_. Once rank reaches the 'limit_', we'll start // dropping rows until the next partition starts. // We'll emit output every time we accumulate 'outputBatchSize_' rows. - auto output = BaseVector::create(outputType_, outputBatchSize_, pool()); - FlatVector* rowNumbers = nullptr; + FlatVector* rankValues = nullptr; if (generateRowNumber_) { - rowNumbers = output->children().back()->as>(); + rankValues = output->children().back()->as>(); } // Index of the next row to append to output. vector_size_t index = 0; - // Row number of the next row in the current partition. - vector_size_t rowNumber = nextRowNumber_; - VELOX_CHECK_LT(rowNumber, limit_); + VELOX_CHECK_LE(nextRank_, limit_); for (;;) { auto next = merge_->next(); if (next == nullptr) { break; } - // Check if this row comes from a new partition. - if (index > 0 && isNewPartition(output, index, next)) { - rowNumber = 0; + if (index > 0) { + computeNextRankInSpill(output, index, next); } // Copy this row to the output buffer if this partition has // < limit_ rows output. - if (rowNumber < limit_) { + if (nextRank_ <= limit_) { for (auto i = 0; i < inputChannels_.size(); ++i) { output->childAt(inputChannels_[i]) ->copy( @@ -606,12 +858,10 @@ RowVectorPtr TopNRowNumber::getOutputFromSpill() { next->currentIndex(), 1); } - if (rowNumbers) { - // Row numbers start with 1. - rowNumbers->set(index, rowNumber + 1); + if (rankValues) { + rankValues->set(index, nextRank_); } ++index; - ++rowNumber; } // Pop this row from the spill. @@ -622,8 +872,9 @@ RowVectorPtr TopNRowNumber::getOutputFromSpill() { // Prepare the next batch : // i) If 'limit_' is reached for this partition, then skip the rows // until the next partition. - // ii) If the next row is from a new partition, then reset rowNumber_. - setupNextOutput(output, rowNumber); + // ii) If the next row is from a new partition, then reset nextRank_. + setupNextOutput(output); + return output; } } @@ -786,4 +1037,66 @@ void TopNRowNumber::setupSpiller() { &spillConfig_.value(), spillStats_.get()); } + +// Using the underlying vector of the priority queue for the algorithms to +// check duplicates and count the number of top rank rows. This makes the +// algorithms O(n). There could be other approaches to make the +// algorithms O(1), but would trade memory efficiency. +namespace { +template +S& PriorityQueueVector(std::priority_queue& q) { + struct PrivateQueue : private std::priority_queue { + static S& Container(std::priority_queue& q) { + return q.*&PrivateQueue::c; + } + }; + return PrivateQueue::Container(q); +} +} // namespace + +char* TopNRowNumber::TopRows::removeTopRankRows() { + VELOX_CHECK(!rows.empty()); + + char* topRow = rows.top(); + rows.pop(); + + while (!rows.empty()) { + char* newTopRow = rows.top(); + if (rowComparator.compare(topRow, newTopRow) != 0) { + return topRow; + } + rows.pop(); + } + return topRow; +} + +vector_size_t TopNRowNumber::TopRows::numTopRankRows() { + VELOX_CHECK(!rows.empty()); + char* topRow = rows.top(); + vector_size_t numRows = 0; + const std::vector> partitionRowsVector = + PriorityQueueVector(rows); + for (const char* row : partitionRowsVector) { + if (rowComparator.compare(topRow, row) == 0) { + numRows += 1; + } else { + break; + } + } + return numRows; +} + +bool TopNRowNumber::TopRows::isDuplicate( + const std::vector& decodedVectors, + vector_size_t index) { + const std::vector> partitionRowsVector = + PriorityQueueVector(rows); + for (const char* row : partitionRowsVector) { + if (rowComparator.compare(decodedVectors, index, row) == 0) { + return true; + } + } + return false; +} + } // namespace facebook::velox::exec diff --git a/velox/exec/TopNRowNumber.h b/velox/exec/TopNRowNumber.h index dc21f0f93c4..a2d1715ea3c 100644 --- a/velox/exec/TopNRowNumber.h +++ b/velox/exec/TopNRowNumber.h @@ -84,13 +84,28 @@ class TopNRowNumber : public Operator { std::priority_queue>, Compare> rows; + RowComparator& rowComparator; + // This is the highest rank (this code will be enhanced for rank, dense_rank // soon) seen so far in the input rows. It is compared // with the limit for the operator. int64_t topRank = 0; + // Number of rows with the highest rank in the partition. + vector_size_t numTopRankRows(); + + // Remove all rows with the highest rank in the partition. + char* removeTopRankRows(); + + // Returns true if the row at decodedVectors[index] has the same order by + // keys as another row in the partition's top rows. + bool isDuplicate( + const std::vector& decodedVectors, + vector_size_t index); + TopRows(HashStringAllocator* allocator, RowComparator& comparator) - : rows{{comparator}, StlAllocator(allocator)} {} + : rows{{comparator}, StlAllocator(allocator)}, + rowComparator(comparator) {} }; void initializeNewPartitions(); @@ -104,18 +119,21 @@ class TopNRowNumber : public Operator { // Handles input row when the partition has not yet accumulated 'limit' rows. // Returns a pointer to the row to add to the partition accumulator. + template char* processRowWithinLimit(vector_size_t index, TopRows& partition); // Handles input row when the partition has already accumulated 'limit' rows. // Returns a pointer to the row to add to the partition accumulator. + template char* processRowExceedingLimit(vector_size_t index, TopRows& partition); - // Loop to add each row to a partition or discard the row. - void processInputRowLoop(vector_size_t numInput); - // Adds input row to a partition or discards the row. + template void processInputRow(vector_size_t index, TopRows& partition); + template + void processInputRowLoop(vector_size_t numInput); + // Returns next partition to add to output or nullptr if there are no // partitions left. TopRows* nextPartition(); @@ -124,12 +142,25 @@ class TopNRowNumber : public Operator { // popped in reverse order of the row_number. // NOTE: This function erases the yielded output rows from the partition // and the next call starts with the remaining rows. + template void appendPartitionRows( TopRows& partition, vector_size_t numRows, vector_size_t outputOffset, FlatVector* rowNumbers); + // If there are many rows with the highest rank, then the topRank + // of the partition can oscillate between a very small value and a + // value > limit. Fix the partition for this condition before starting to + // output the partition. + vector_size_t fixTopRank(TopRows& partition); + + // Computes the rank for the next row to be output. + template + inline void computeNextRankInMemory( + const TopRows& partition, + vector_size_t outputIndex); + bool spillEnabled() const { return spillConfig_.has_value(); } @@ -141,26 +172,52 @@ class TopNRowNumber : public Operator { void setupSpiller(); + template RowVectorPtr getOutputFromSpill(); RowVectorPtr getOutputFromMemory(); + template + inline void computeNextRankInSpill( + const RowVectorPtr& output, + vector_size_t index, + const SpillMergeStream* next); + // Returns true if 'next' row belongs to a different partition then index-1 // row of output. bool isNewPartition( const RowVectorPtr& output, vector_size_t index, - SpillMergeStream* next); + const SpillMergeStream* next); + + // Returns true if 'next' row is a new peer (rows differ on order by keys) + // of the previous row in the partition (at output[index] of the + // output block). + bool isNewPeer( + const RowVectorPtr& output, + vector_size_t index, + const SpillMergeStream* next); - // Sets nextRowNumber_ to rowNumber. Checks if next row in 'merge_' belongs to + // Utility function to compare the columns from startColumn to endColumn + // between the output row at output[index] with the new row on the + // SpillMergeStream. Used by isNewPartition and isNewPeer. + bool compareSpillRowColumns( + const RowVectorPtr& output, + vector_size_t index, + const SpillMergeStream* next, + vector_size_t startColumn, + vector_size_t endColumn); + + // Checks if next row in 'merge_' belongs to // a different partition than last row in 'output' and if so updates - // nextRowNumber_ to 0. Also, checks current partition reached the limit on - // number of rows and if so advances 'merge_' to the first row on the next - // partition and sets nextRowNumber_ to 0. + // nextRank_ and numPeers_ to 1. Also, checks current partition reached + // the limit on rank and if so advances 'merge_' to the first row on the next + // partition and sets nextRank_ and numPeers_ to 0. // // @post 'merge_->next()' is either at end or points to a row that should be - // included in the next output batch using 'nextRowNumber_'. - void setupNextOutput(const RowVectorPtr& output, int32_t rowNumber); + // included in the next output batch using 'nextRank_'. + template + void setupNextOutput(const RowVectorPtr& output); // Called in noMoreInput() and spill(). void updateEstimatedOutputRowSize(); @@ -169,11 +226,15 @@ class TopNRowNumber : public Operator { // cardinality sufficiently. Returns false if spilling was triggered earlier. bool abandonPartialEarly() const; + // Rank function semantics of operator. + const core::TopNRowNumberNode::RankFunction rankFunction_; + const int32_t limit_; const bool generateRowNumber_; const size_t numPartitionKeys_; + const size_t numSortingKeys_; // Input columns in the order of: partition keys, sorting keys, the rest. const std::vector inputChannels_; @@ -260,7 +321,11 @@ class TopNRowNumber : public Operator { // Used to sort-merge spilled data. std::unique_ptr> merge_; - // Row number for the first row in the next output batch from the spiller. - int32_t nextRowNumber_{0}; + // Row number/rank or dense_rank for the first row in the next output batch + // from the spiller. + vector_size_t nextRank_{1}; + // Number of peers of first row in the previous output batch. This is used + // in rank calculation. + vector_size_t numPeers_{1}; }; } // namespace facebook::velox::exec diff --git a/velox/exec/tests/PlanNodeSerdeTest.cpp b/velox/exec/tests/PlanNodeSerdeTest.cpp index 54f5d77361b..48985a43074 100644 --- a/velox/exec/tests/PlanNodeSerdeTest.cpp +++ b/velox/exec/tests/PlanNodeSerdeTest.cpp @@ -571,24 +571,35 @@ TEST_F(PlanNodeSerdeTest, scan) { testSerde(plan); } -TEST_F(PlanNodeSerdeTest, topNRowNumber) { - auto plan = PlanBuilder() - .values({data_}) - .topNRowNumber({}, {"c0", "c2"}, 10, false) - .planNode(); +#define TOPN_SERDE_TEST(funcname) \ + auto plan = PlanBuilder() \ + .values({data_}) \ + .funcname({}, {"c0", "c2"}, 10, false) \ + .planNode(); \ + testSerde(plan); \ + \ + plan = PlanBuilder() \ + .values({data_}) \ + .funcname({}, {"c0", "c2"}, 10, true) \ + .planNode(); \ + testSerde(plan); \ + \ + plan = PlanBuilder() \ + .values({data_}) \ + .funcname({"c0"}, {"c1", "c2"}, 10, false) \ + .planNode(); \ testSerde(plan); - plan = PlanBuilder() - .values({data_}) - .topNRowNumber({}, {"c0", "c2"}, 10, true) - .planNode(); - testSerde(plan); +TEST_F(PlanNodeSerdeTest, topNRowNumber) { + TOPN_SERDE_TEST(topNRowNumber); +} - plan = PlanBuilder() - .values({data_}) - .topNRowNumber({"c0"}, {"c1", "c2"}, 10, false) - .planNode(); - testSerde(plan); +TEST_F(PlanNodeSerdeTest, topNRank) { + TOPN_SERDE_TEST(topNRank); +} + +TEST_F(PlanNodeSerdeTest, topNDemseRank) { + TOPN_SERDE_TEST(topNDenseRank); } TEST_F(PlanNodeSerdeTest, write) { diff --git a/velox/exec/tests/PlanNodeToStringTest.cpp b/velox/exec/tests/PlanNodeToStringTest.cpp index 0619aabb6c4..e5b2594544a 100644 --- a/velox/exec/tests/PlanNodeToStringTest.cpp +++ b/velox/exec/tests/PlanNodeToStringTest.cpp @@ -927,37 +927,54 @@ TEST_F(PlanNodeToStringTest, rowNumber) { plan->toString(true, false)); } -TEST_F(PlanNodeToStringTest, topNRowNumber) { - auto rowType = ROW({"a", "b"}, {BIGINT(), VARCHAR()}); - auto plan = PlanBuilder() - .tableScan(rowType) - .topNRowNumber({}, {"a DESC"}, 10, false) - .planNode(); - - ASSERT_EQ("-- TopNRowNumber[1]\n", plan->toString()); - ASSERT_EQ( - "-- TopNRowNumber[1][order by (a DESC NULLS LAST) limit 10] -> a:BIGINT, b:VARCHAR\n", +#define TOPN_PLANNODE_TO_STRING_TEST(nodename, functionname) \ + auto rowType = ROW({"a", "b"}, {BIGINT(), VARCHAR()}); \ + auto plan = PlanBuilder() \ + .tableScan(rowType) \ + .nodename({}, {"a DESC"}, 10, false) \ + .planNode(); \ + \ + ASSERT_EQ("-- TopNRowNumber[1]\n", plan->toString()); \ + ASSERT_EQ( \ + fmt::format( \ + "-- TopNRowNumber[1][{} order by (a DESC NULLS LAST) limit 10] -> a:BIGINT, b:VARCHAR\n", \ + functionname), \ + plan->toString(true, false)); \ + \ + plan = PlanBuilder() \ + .tableScan(rowType) \ + .nodename({}, {"a DESC"}, 10, true) \ + .planNode(); \ + \ + ASSERT_EQ("-- TopNRowNumber[1]\n", plan->toString()); \ + ASSERT_EQ( \ + fmt::format( \ + "-- TopNRowNumber[1][{} order by (a DESC NULLS LAST) limit 10] -> a:BIGINT, b:VARCHAR, row_number:BIGINT\n", \ + functionname), \ + plan->toString(true, false)); \ + \ + plan = PlanBuilder() \ + .tableScan(rowType) \ + .nodename({"a"}, {"b"}, 10, false) \ + .planNode(); \ + \ + ASSERT_EQ("-- TopNRowNumber[1]\n", plan->toString()); \ + ASSERT_EQ( \ + fmt::format( \ + "-- TopNRowNumber[1][{} partition by (a) order by (b ASC NULLS LAST) limit 10] -> a:BIGINT, b:VARCHAR\n", \ + functionname), \ plan->toString(true, false)); - plan = PlanBuilder() - .tableScan(rowType) - .topNRowNumber({}, {"a DESC"}, 10, true) - .planNode(); - - ASSERT_EQ("-- TopNRowNumber[1]\n", plan->toString()); - ASSERT_EQ( - "-- TopNRowNumber[1][order by (a DESC NULLS LAST) limit 10] -> a:BIGINT, b:VARCHAR, row_number:BIGINT\n", - plan->toString(true, false)); +TEST_F(PlanNodeToStringTest, topNRowNumber) { + TOPN_PLANNODE_TO_STRING_TEST(topNRowNumber, "row_number"); +} - plan = PlanBuilder() - .tableScan(rowType) - .topNRowNumber({"a"}, {"b"}, 10, false) - .planNode(); +TEST_F(PlanNodeToStringTest, topNRank) { + TOPN_PLANNODE_TO_STRING_TEST(topNRank, "rank"); +} - ASSERT_EQ("-- TopNRowNumber[1]\n", plan->toString()); - ASSERT_EQ( - "-- TopNRowNumber[1][partition by (a) order by (b ASC NULLS LAST) limit 10] -> a:BIGINT, b:VARCHAR\n", - plan->toString(true, false)); +TEST_F(PlanNodeToStringTest, topNDenseRank) { + TOPN_PLANNODE_TO_STRING_TEST(topNDenseRank, "dense_rank"); } TEST_F(PlanNodeToStringTest, markDistinct) { diff --git a/velox/exec/tests/TopNRowNumberTest.cpp b/velox/exec/tests/TopNRowNumberTest.cpp index fac57658c2c..2868f06ded1 100644 --- a/velox/exec/tests/TopNRowNumberTest.cpp +++ b/velox/exec/tests/TopNRowNumberTest.cpp @@ -27,14 +27,101 @@ namespace facebook::velox::exec { namespace { +#define BUILD_TOPN(nodename) \ + planBuilder.values(values) \ + .nodename(partitionKeys, sortingKeys, limit, generateRowNumber) \ + .planNode() + +#define BUILD_TOPN_PLANNODEID(nodename) \ + planBuilder.values(values) \ + .nodename(partitionKeys, sortingKeys, limit, generateRowNumber) \ + .capturePlanNodeId(planNodeId) \ + .planNode() + +#define BUILD_TOPN_PARTIAL_FINAL(nodename) \ + planBuilder.values(values) \ + .nodename(partitionKeys, sortingKeys, limit, false) \ + .capturePlanNodeId(planNodeId) \ + .nodename(partitionKeys, sortingKeys, limit, true) \ + .planNode() + class TopNRowNumberTest : public OperatorTestBase { protected: - TopNRowNumberTest() { + explicit TopNRowNumberTest(core::TopNRowNumberNode::RankFunction function) + : function_(function) {} + + void SetUp() override { + exec::test::OperatorTestBase::SetUp(); filesystems::registerLocalFileSystem(); } + + protected: + const core::PlanNodePtr& topnNode( + PlanBuilder& planBuilder, + const std::vector& values, + const std::vector& partitionKeys, + const std::vector& sortingKeys, + int32_t limit, + bool generateRowNumber) { + switch (function_) { + case core::TopNRowNumberNode::RankFunction::kRowNumber: + return BUILD_TOPN(topNRowNumber); + case core::TopNRowNumberNode::RankFunction::kRank: + return BUILD_TOPN(topNRank); + case core::TopNRowNumberNode::RankFunction::kDenseRank: + return BUILD_TOPN(topNDenseRank); + } + VELOX_UNREACHABLE(); + } + + const core::PlanNodePtr& topnNodeId( + PlanBuilder& planBuilder, + core::PlanNodeId& planNodeId, + const std::vector& values, + const std::vector& partitionKeys, + const std::vector& sortingKeys, + int32_t limit, + bool generateRowNumber) { + switch (function_) { + case core::TopNRowNumberNode::RankFunction::kRowNumber: + return BUILD_TOPN_PLANNODEID(topNRowNumber); + case core::TopNRowNumberNode::RankFunction::kRank: + return BUILD_TOPN_PLANNODEID(topNRank); + case core::TopNRowNumberNode::RankFunction::kDenseRank: + return BUILD_TOPN_PLANNODEID(topNDenseRank); + } + VELOX_UNREACHABLE(); + } + + const core::PlanNodePtr& topnNodePartialFinal( + PlanBuilder& planBuilder, + core::PlanNodeId& planNodeId, + const std::vector& values, + const std::vector& partitionKeys, + const std::vector& sortingKeys, + int32_t limit) { + switch (function_) { + case core::TopNRowNumberNode::RankFunction::kRowNumber: + return BUILD_TOPN_PARTIAL_FINAL(topNRowNumber); + case core::TopNRowNumberNode::RankFunction::kRank: + return BUILD_TOPN_PARTIAL_FINAL(topNRank); + case core::TopNRowNumberNode::RankFunction::kDenseRank: + return BUILD_TOPN_PARTIAL_FINAL(topNDenseRank); + } + VELOX_UNREACHABLE(); + } + + const core::TopNRowNumberNode::RankFunction function_; }; -TEST_F(TopNRowNumberTest, basic) { +class MultiTopNRowNumberTest : public TopNRowNumberTest, + public testing::WithParamInterface< + core::TopNRowNumberNode::RankFunction> { + public: + MultiTopNRowNumberTest() : TopNRowNumberTest(GetParam()) {} +}; + +TEST_P(MultiTopNRowNumberTest, basic) { auto data = makeRowVector({ // Partitioning key. makeFlatVector({1, 1, 2, 2, 1, 2, 1}), @@ -48,40 +135,37 @@ TEST_F(TopNRowNumberTest, basic) { auto testLimit = [&](auto limit) { // Emit row numbers. - auto plan = PlanBuilder() - .values({data}) - .topNRowNumber({"c0"}, {"c1"}, limit, true) - .planNode(); + auto planBuilder = PlanBuilder(); + auto plan = topnNode(planBuilder, {data}, {"c0"}, {"c1"}, limit, true); assertQuery( plan, fmt::format( - "SELECT * FROM (SELECT *, row_number() over (partition by c0 order by c1) as rn FROM tmp) " + "SELECT * FROM (SELECT *, {}() over (partition by c0 order by c1) as rn FROM tmp) " " WHERE rn <= {}", + core::TopNRowNumberNode::rankFunctionName(function_), limit)); // Do not emit row numbers. - plan = PlanBuilder() - .values({data}) - .topNRowNumber({"c0"}, {"c1"}, limit, false) - .planNode(); + auto planBuilder2 = PlanBuilder(); + plan = topnNode(planBuilder2, {data}, {"c0"}, {"c1"}, limit, false); assertQuery( plan, fmt::format( - "SELECT c0, c1, c2 FROM (SELECT *, row_number() over (partition by c0 order by c1) as rn FROM tmp) " + "SELECT c0, c1, c2 FROM (SELECT *, {}() over (partition by c0 order by c1) as rn FROM tmp) " " WHERE rn <= {}", + core::TopNRowNumberNode::rankFunctionName(function_), limit)); // No partitioning keys. - plan = PlanBuilder() - .values({data}) - .topNRowNumber({}, {"c1"}, limit, true) - .planNode(); + auto planBuilder3 = PlanBuilder(); + plan = topnNode(planBuilder3, {data}, {}, {"c1"}, limit, true); assertQuery( plan, fmt::format( - "SELECT * FROM (SELECT *, row_number() over (order by c1) as rn FROM tmp) " + "SELECT * FROM (SELECT *, {}() over (order by c1) as rn FROM tmp) " " WHERE rn <= {}", + core::TopNRowNumberNode::rankFunctionName(function_), limit)); }; @@ -91,7 +175,7 @@ TEST_F(TopNRowNumberTest, basic) { testLimit(5); } -TEST_F(TopNRowNumberTest, largeOutput) { +TEST_P(MultiTopNRowNumberTest, largeOutput) { // Make 10 vectors. Use different types for partitioning key, sorting key and // data. Use order of columns different from partitioning keys, followed by // sorting keys, followed by data. @@ -117,15 +201,14 @@ TEST_F(TopNRowNumberTest, largeOutput) { auto testLimit = [&](auto limit) { SCOPED_TRACE(fmt::format("Limit: {}", limit)); core::PlanNodeId topNRowNumberId; - auto plan = PlanBuilder() - .values(data) - .topNRowNumber({"p"}, {"s"}, limit, true) - .capturePlanNodeId(topNRowNumberId) - .planNode(); + auto planBuilder = PlanBuilder(); + auto plan = topnNodeId( + planBuilder, topNRowNumberId, data, {"p"}, {"s"}, limit, true); auto sql = fmt::format( - "SELECT * FROM (SELECT *, row_number() over (partition by p order by s) as rn FROM tmp) " + "SELECT * FROM (SELECT *, {}() over (partition by p order by s) as rn FROM tmp) " " WHERE rn <= {}", + core::TopNRowNumberNode::rankFunctionName(function_), limit); AssertQueryBuilder(plan, duckDbQueryRunner_) .config(core::QueryConfig::kPreferredOutputBatchBytes, "1024") @@ -152,16 +235,15 @@ TEST_F(TopNRowNumberTest, largeOutput) { } // No partitioning keys. - plan = PlanBuilder() - .values(data) - .topNRowNumber({}, {"s"}, limit, true) - .planNode(); + auto planBuilder2 = PlanBuilder(); + plan = topnNode(planBuilder2, data, {}, {"s"}, limit, true); AssertQueryBuilder(plan, duckDbQueryRunner_) .config(core::QueryConfig::kPreferredOutputBatchBytes, "1024") .assertResults(fmt::format( - "SELECT * FROM (SELECT *, row_number() over (order by s) as rn FROM tmp) " + "SELECT * FROM (SELECT *, {}() over (order by s) as rn FROM tmp) " " WHERE rn <= {}", + core::TopNRowNumberNode::rankFunctionName(function_), limit)); }; @@ -172,7 +254,7 @@ TEST_F(TopNRowNumberTest, largeOutput) { testLimit(2000); } -TEST_F(TopNRowNumberTest, manyPartitions) { +TEST_P(MultiTopNRowNumberTest, manyPartitions) { const vector_size_t size = 10'000; auto data = split( makeRowVector( @@ -201,15 +283,14 @@ TEST_F(TopNRowNumberTest, manyPartitions) { auto testLimit = [&](auto limit, size_t outputBatchBytes = 1024) { SCOPED_TRACE(fmt::format("Limit: {}", limit)); core::PlanNodeId topNRowNumberId; - auto plan = PlanBuilder() - .values(data) - .topNRowNumber({"p"}, {"s"}, limit, true) - .capturePlanNodeId(topNRowNumberId) - .planNode(); + auto planBuilder = PlanBuilder(); + auto plan = topnNodeId( + planBuilder, topNRowNumberId, data, {"p"}, {"s"}, limit, true); auto sql = fmt::format( - "SELECT * FROM (SELECT *, row_number() over (partition by p order by s) as rn FROM tmp) " + "SELECT * FROM (SELECT *, {}() over (partition by p order by s) as rn FROM tmp) " " WHERE rn <= {}", + core::TopNRowNumberNode::rankFunctionName(function_), limit); assertQuery(plan, sql); @@ -243,7 +324,7 @@ TEST_F(TopNRowNumberTest, manyPartitions) { testLimit(1, 1); } -TEST_F(TopNRowNumberTest, fewPartitions) { +TEST_P(MultiTopNRowNumberTest, fewPartitions) { const vector_size_t size = 10'000; auto data = split( makeRowVector( @@ -272,15 +353,14 @@ TEST_F(TopNRowNumberTest, fewPartitions) { auto testLimit = [&](auto limit, size_t outputBatchBytes = 1024) { SCOPED_TRACE(fmt::format("Limit: {}", limit)); core::PlanNodeId topNRowNumberId; - auto plan = PlanBuilder() - .values(data) - .topNRowNumber({"p"}, {"s"}, limit, true) - .capturePlanNodeId(topNRowNumberId) - .planNode(); + auto planBuilder = PlanBuilder(); + auto plan = topnNodeId( + planBuilder, topNRowNumberId, data, {"p"}, {"s"}, limit, true); auto sql = fmt::format( - "SELECT * FROM (SELECT *, row_number() over (partition by p order by s) as rn FROM tmp) " + "SELECT * FROM (SELECT *, {}() over (partition by p order by s) as rn FROM tmp) " " WHERE rn <= {}", + core::TopNRowNumberNode::rankFunctionName(function_), limit); assertQuery(plan, sql); @@ -312,7 +392,7 @@ TEST_F(TopNRowNumberTest, fewPartitions) { testLimit(100); } -TEST_F(TopNRowNumberTest, abandonPartialEarly) { +TEST_P(MultiTopNRowNumberTest, abandonPartialEarly) { auto data = makeRowVector( {"p", "s"}, { @@ -324,21 +404,19 @@ TEST_F(TopNRowNumberTest, abandonPartialEarly) { core::PlanNodeId topNRowNumberId; auto runPlan = [&](int32_t minRows) { - auto plan = PlanBuilder() - .values(split(data, 10)) - .topNRowNumber({"p"}, {"s"}, 99, false) - .capturePlanNodeId(topNRowNumberId) - .topNRowNumber({"p"}, {"s"}, 99, true) - .planNode(); + auto planBuilder = PlanBuilder(); + auto plan = topnNodePartialFinal( + planBuilder, topNRowNumberId, split(data, 10), {"p"}, {"s"}, 99); auto task = AssertQueryBuilder(plan, duckDbQueryRunner_) .config( core::QueryConfig::kAbandonPartialTopNRowNumberMinRows, fmt::format("{}", minRows)) .config(core::QueryConfig::kAbandonPartialTopNRowNumberMinPct, "80") - .assertResults( - "SELECT * FROM (SELECT *, row_number() over (partition by p order by s) as rn FROM tmp) " - "WHERE rn <= 99"); + .assertResults(fmt::format( + "SELECT * FROM (SELECT *, {}() over (partition by p order by s) as rn FROM tmp) " + "WHERE rn <= 99", + core::TopNRowNumberNode::rankFunctionName(function_))); return exec::toPlanStats(task->taskStats()); }; @@ -360,7 +438,7 @@ TEST_F(TopNRowNumberTest, abandonPartialEarly) { } } -TEST_F(TopNRowNumberTest, planNodeValidation) { +TEST_P(MultiTopNRowNumberTest, planNodeValidation) { auto data = makeRowVector( ROW({"a", "b", "c", "d", "e"}, { @@ -375,10 +453,8 @@ TEST_F(TopNRowNumberTest, planNodeValidation) { auto plan = [&](const std::vector& partitionKeys, const std::vector& sortingKeys, int32_t limit = 10) { - PlanBuilder() - .values({data}) - .topNRowNumber(partitionKeys, sortingKeys, limit, true) - .planNode(); + auto planBuilder = PlanBuilder(); + topnNode(planBuilder, {data}, partitionKeys, sortingKeys, limit, true); }; VELOX_ASSERT_THROW( @@ -403,15 +479,14 @@ TEST_F(TopNRowNumberTest, planNodeValidation) { plan({"a", "b"}, {"c"}, 0), "Limit must be greater than zero"); } -TEST_F(TopNRowNumberTest, maxSpillBytes) { +TEST_P(MultiTopNRowNumberTest, maxSpillBytes) { const auto rowType = ROW({"c0", "c1", "c2"}, {INTEGER(), INTEGER(), VARCHAR()}); const auto vectors = createVectors(rowType, 1024, 15 << 20); auto planNodeIdGenerator = std::make_shared(); - auto plan = PlanBuilder(planNodeIdGenerator) - .values(vectors) - .topNRowNumber({"c0"}, {"c1"}, 100, true) - .planNode(); + auto planBuilder = PlanBuilder(planNodeIdGenerator); + auto plan = topnNode(planBuilder, vectors, {"c0"}, {"c1"}, 100, true); + struct { int32_t maxSpilledBytes; bool expectedExceedLimit; @@ -451,7 +526,7 @@ TEST_F(TopNRowNumberTest, maxSpillBytes) { // This test verifies that TopNRowNumber operator reclaim all the memory after // spill. -DEBUG_ONLY_TEST_F(TopNRowNumberTest, memoryUsageCheckAfterReclaim) { +DEBUG_ONLY_TEST_P(MultiTopNRowNumberTest, memoryUsageCheckAfterReclaim) { std::atomic_int inputCount{0}; SCOPED_TESTVALUE_SET( "facebook::velox::exec::Driver::runInternal::addInput", @@ -494,15 +569,14 @@ DEBUG_ONLY_TEST_F(TopNRowNumberTest, memoryUsageCheckAfterReclaim) { auto spillDirectory = exec::test::TempDirectoryPath::create(); core::PlanNodeId topNRowNumberId; - auto plan = PlanBuilder() - .values(data) - .topNRowNumber({"p"}, {"s"}, 1'000, true) - .capturePlanNodeId(topNRowNumberId) - .planNode(); - - const auto sql = - "SELECT * FROM (SELECT *, row_number() over (partition by p order by s) as rn FROM tmp) " - " WHERE rn <= 1000"; + auto planBuilder = PlanBuilder(); + auto plan = + topnNodeId(planBuilder, topNRowNumberId, data, {"p"}, {"s"}, 1'000, true); + + const auto sql = fmt::format( + "SELECT * FROM (SELECT *, {}() over (partition by p order by s) as rn FROM tmp) " + " WHERE rn <= 1000", + core::TopNRowNumberNode::rankFunctionName(function_)); auto task = AssertQueryBuilder(plan, duckDbQueryRunner_) .config(core::QueryConfig::kSpillEnabled, "true") .config(core::QueryConfig::kTopNRowNumberSpillEnabled, "true") @@ -520,7 +594,7 @@ DEBUG_ONLY_TEST_F(TopNRowNumberTest, memoryUsageCheckAfterReclaim) { // This test verifies that TopNRowNumber operator can be closed twice which // might be triggered by memory pool abort. -DEBUG_ONLY_TEST_F(TopNRowNumberTest, doubleClose) { +DEBUG_ONLY_TEST_P(MultiTopNRowNumberTest, doubleClose) { const std::string errorMessage("doubleClose"); SCOPED_TESTVALUE_SET( "facebook::velox::exec::Driver::runInternal::noMoreInput", @@ -554,17 +628,24 @@ DEBUG_ONLY_TEST_F(TopNRowNumberTest, doubleClose) { 10); core::PlanNodeId topNRowNumberId; - auto plan = PlanBuilder() - .values(data) - .topNRowNumber({"p"}, {"s"}, 1'000, true) - .capturePlanNodeId(topNRowNumberId) - .planNode(); + auto planBuilder = PlanBuilder(); + auto plan = + topnNodeId(planBuilder, topNRowNumberId, data, {"p"}, {"s"}, 1'000, true); - const auto sql = - "SELECT * FROM (SELECT *, row_number() over (partition by p order by s) as rn FROM tmp) " - " WHERE rn <= 1000"; + const auto sql = fmt::format( + "SELECT * FROM (SELECT *, {}() over (partition by p order by s) as rn FROM tmp) " + " WHERE rn <= 1000", + core::TopNRowNumberNode::rankFunctionName(function_)); VELOX_ASSERT_THROW(assertQuery(plan, sql), errorMessage); } + +VELOX_INSTANTIATE_TEST_SUITE_P( + TopNRowNumberTest, + MultiTopNRowNumberTest, + testing::ValuesIn(std::vector( + {core::TopNRowNumberNode::RankFunction::kRowNumber, + core::TopNRowNumberNode::RankFunction::kRank, + core::TopNRowNumberNode::RankFunction::kDenseRank}))); } // namespace } // namespace facebook::velox::exec diff --git a/velox/exec/tests/utils/PlanBuilder.cpp b/velox/exec/tests/utils/PlanBuilder.cpp index c34d3bac23b..59a6d0eddde 100644 --- a/velox/exec/tests/utils/PlanBuilder.cpp +++ b/velox/exec/tests/utils/PlanBuilder.cpp @@ -2281,7 +2281,8 @@ PlanBuilder& PlanBuilder::rowNumber( return *this; } -PlanBuilder& PlanBuilder::topNRowNumber( +PlanBuilder& PlanBuilder::topNRowNumberBase( + core::TopNRowNumberNode::RankFunction function, const std::vector& partitionKeys, const std::vector& sortingKeys, int32_t limit, @@ -2295,6 +2296,7 @@ PlanBuilder& PlanBuilder::topNRowNumber( } planNode_ = std::make_shared( nextPlanNodeId(), + function, fields(partitionKeys), sortingFields, sortingOrders, @@ -2305,6 +2307,45 @@ PlanBuilder& PlanBuilder::topNRowNumber( return *this; } +PlanBuilder& PlanBuilder::topNRowNumber( + const std::vector& partitionKeys, + const std::vector& sortingKeys, + int32_t limit, + bool generateRowNumber) { + return topNRowNumberBase( + core::TopNRowNumberNode::RankFunction::kRowNumber, + partitionKeys, + sortingKeys, + limit, + generateRowNumber); +} + +PlanBuilder& PlanBuilder::topNRank( + const std::vector& partitionKeys, + const std::vector& sortingKeys, + int32_t limit, + bool generateRank) { + return topNRowNumberBase( + core::TopNRowNumberNode::RankFunction::kRank, + partitionKeys, + sortingKeys, + limit, + generateRank); +} + +PlanBuilder& PlanBuilder::topNDenseRank( + const std::vector& partitionKeys, + const std::vector& sortingKeys, + int32_t limit, + bool generateRank) { + return topNRowNumberBase( + core::TopNRowNumberNode::RankFunction::kDenseRank, + partitionKeys, + sortingKeys, + limit, + generateRank); +} + PlanBuilder& PlanBuilder::markDistinct( std::string markerKey, const std::vector& distinctKeys) { diff --git a/velox/exec/tests/utils/PlanBuilder.h b/velox/exec/tests/utils/PlanBuilder.h index 82b222ac238..fc568a3b3b0 100644 --- a/velox/exec/tests/utils/PlanBuilder.h +++ b/velox/exec/tests/utils/PlanBuilder.h @@ -1286,6 +1286,22 @@ class PlanBuilder { int32_t limit, bool generateRowNumber); + /// Add a TopNRowNumberNode to compute single rank window function with + /// a limit applied to sorted partitions. + PlanBuilder& topNRank( + const std::vector& partitionKeys, + const std::vector& sortingKeys, + int32_t limit, + bool generateRank); + + /// Add a TopNRowNumberNode to compute single dense_rank window function with + /// a limit applied to sorted partitions. + PlanBuilder& topNDenseRank( + const std::vector& partitionKeys, + const std::vector& sortingKeys, + int32_t limit, + bool generateRank); + /// Add a MarkDistinctNode to compute aggregate mask channel /// @param markerKey Name of output mask channel /// @param distinctKeys List of columns to be marked distinct. @@ -1461,6 +1477,13 @@ class PlanBuilder { const std::vector& windowFunctions, bool inputSorted); + PlanBuilder& topNRowNumberBase( + core::TopNRowNumberNode::RankFunction function, + const std::vector& partitionKeys, + const std::vector& sortingKeys, + int32_t limit, + bool generateRowNumber); + protected: core::PlanNodePtr planNode_; parse::ParseOptions options_;