diff --git a/velox/exec/TopNRowNumber.cpp b/velox/exec/TopNRowNumber.cpp index 9be596497a3..862ed2f3aa7 100644 --- a/velox/exec/TopNRowNumber.cpp +++ b/velox/exec/TopNRowNumber.cpp @@ -578,7 +578,7 @@ TopNRowNumber::TopRows* TopNRowNumber::nextPartition() { template void TopNRowNumber::computeNextRankInMemory( - const TopRows& partition, + TopRows& partition, vector_size_t outputIndex) { if constexpr (TRank == core::TopNRowNumberNode::RankFunction::kRowNumber) { nextRank_ -= 1; @@ -587,21 +587,19 @@ void TopNRowNumber::computeNextRankInMemory( // This is the logic for rank() and dense_rank(). // If the next row is a peer of the current one, then the rank remains the - // same, but the number of peers is incremented. + // same. if (comparator_.compare(outputRows_[outputIndex], partition.rows.top()) == 0) { - numPeers_ += 1; return; } // The new row is not a peer of the current one. So dense_rank drops the - // rank by 1, but rank drops it by the number of peers (which is then - // reset). + // rank by 1, but rank drops by the number of peers of the new top + // row (new rank) in TopRows queue. if constexpr (TRank == core::TopNRowNumberNode::RankFunction::kDenseRank) { nextRank_ -= 1; } else { - nextRank_ -= numPeers_; - numPeers_ = 1; + nextRank_ -= partition.numTopRankRows(); } } @@ -1111,18 +1109,32 @@ char* TopNRowNumber::TopRows::removeTopRankRows() { vector_size_t TopNRowNumber::TopRows::numTopRankRows() { VELOX_CHECK(!rows.empty()); + + tempTopRankRows.clear(); + SCOPE_EXIT { + tempTopRankRows.clear(); + }; + auto popAndSaveTopRow = [&]() { + tempTopRankRows.push_back(rows.top()); + rows.pop(); + }; + char* topRow = rows.top(); - vector_size_t numRows = 0; - const std::vector> partitionRowsVector = - PriorityQueueVector(rows); - for (const char* row : partitionRowsVector) { - if (rowComparator.compare(topRow, row) == 0) { - numRows += 1; + popAndSaveTopRow(); + while (!rows.empty()) { + if (rowComparator.compare(topRow, rows.top()) == 0) { + popAndSaveTopRow(); } else { break; } } - return numRows; + + vector_size_t numTopRows = tempTopRankRows.size(); + // Re-insert all rows with the top rank row. + for (char* row : tempTopRankRows) { + rows.push(row); + } + return numTopRows; } bool TopNRowNumber::TopRows::isDuplicate( diff --git a/velox/exec/TopNRowNumber.h b/velox/exec/TopNRowNumber.h index 64986943967..6ca70136939 100644 --- a/velox/exec/TopNRowNumber.h +++ b/velox/exec/TopNRowNumber.h @@ -138,6 +138,9 @@ class TopNRowNumber : public Operator { std::priority_queue>, Compare> rows; + // Temporary storage for rows with the highest rank in the partition. + std::vector> tempTopRankRows; + RowComparator& rowComparator; // This is the greatest rank seen so far in the input rows. Note: rank is @@ -161,6 +164,7 @@ class TopNRowNumber : public Operator { TopRows(HashStringAllocator* allocator, RowComparator& comparator) : rows{{comparator}, StlAllocator(allocator)}, + tempTopRankRows(StlAllocator(allocator)), rowComparator(comparator) {} }; @@ -209,9 +213,7 @@ class TopNRowNumber : public Operator { // Computes the rank for the next row to be output // (all output rows in memory). template - void computeNextRankInMemory( - const TopRows& partition, - vector_size_t rowIndex); + void computeNextRankInMemory(TopRows& partition, vector_size_t rowIndex); // Appends numRows of the current partition to the output. Note: The rows are // popped in reverse order of the rank. diff --git a/velox/exec/tests/TopNRowNumberTest.cpp b/velox/exec/tests/TopNRowNumberTest.cpp index 7b21a3dee4b..55a140692b3 100644 --- a/velox/exec/tests/TopNRowNumberTest.cpp +++ b/velox/exec/tests/TopNRowNumberTest.cpp @@ -107,6 +107,66 @@ TEST_P(MultiTopNRowNumberTest, basic) { testLimit(5); } +TEST_P(MultiTopNRowNumberTest, basicWithPeers) { + auto data = makeRowVector({ + // Partitioning key. + makeFlatVector({1, 1, 2, 2, 1, 2, 1, 1, 1, 1, 1}), + // Sorting key. + makeFlatVector({33, 11, 55, 44, 11, 22, 11, 11, 11, 33, 33}), + // Data. Mapping data to matching sorting keys to avoid ordering issues. + makeFlatVector({10, 50, 30, 40, 50, 60, 50, 50, 50, 10, 10}), + }); + + createDuckDbTable({data}); + + auto testLimit = [&](auto limit) { + // Emit row numbers. + auto plan = PlanBuilder() + .values({data}) + .topNRank(functionName_, {"c0"}, {"c1"}, limit, true) + .planNode(); + assertQuery( + plan, + fmt::format( + "SELECT * FROM (SELECT *, {}() over (partition by c0 order by c1) as rn FROM tmp) " + " WHERE rn <= {}", + functionName_, + limit)); + + // Do not emit row numbers. + plan = PlanBuilder() + .values({data}) + .topNRank(functionName_, {"c0"}, {"c1"}, limit, false) + .planNode(); + + assertQuery( + plan, + fmt::format( + "SELECT c0, c1, c2 FROM (SELECT *, {}() over (partition by c0 order by c1) as rn FROM tmp) " + " WHERE rn <= {}", + functionName_, + limit)); + + // No partitioning keys. + plan = PlanBuilder() + .values({data}) + .topNRank(functionName_, {}, {"c1"}, limit, true) + .planNode(); + assertQuery( + plan, + fmt::format( + "SELECT * FROM (SELECT *, {}() over (order by c1) as rn FROM tmp) " + " WHERE rn <= {}", + functionName_, + limit)); + }; + + testLimit(1); + testLimit(2); + testLimit(3); + testLimit(5); +} + TEST_P(MultiTopNRowNumberTest, largeOutput) { // Make 10 vectors. Use different types for partitioning key, sorting key and // data. Use order of columns different from partitioning keys, followed by