From 5e1f0c0c2c9cb956ce79407bce633f2229f746a2 Mon Sep 17 00:00:00 2001 From: aditi-pandit Date: Sun, 1 Feb 2026 15:35:34 -0800 Subject: [PATCH] fix(TopNRowNumber) : Fix rank with peer computation When decrementing rank values when outputting to memory, the ranks drop by the number of peers of the lower rank row than the higher one. Also fix the computation of numTopRankRows to avoid using the priority queue container vector as that does not necessarily maintain the order of ranks. --- velox/exec/TopNRowNumber.cpp | 40 +++++++++++------ velox/exec/TopNRowNumber.h | 8 ++-- velox/exec/tests/TopNRowNumberTest.cpp | 60 ++++++++++++++++++++++++++ 3 files changed, 91 insertions(+), 17 deletions(-) diff --git a/velox/exec/TopNRowNumber.cpp b/velox/exec/TopNRowNumber.cpp index 9be596497a3..862ed2f3aa7 100644 --- a/velox/exec/TopNRowNumber.cpp +++ b/velox/exec/TopNRowNumber.cpp @@ -578,7 +578,7 @@ TopNRowNumber::TopRows* TopNRowNumber::nextPartition() { template void TopNRowNumber::computeNextRankInMemory( - const TopRows& partition, + TopRows& partition, vector_size_t outputIndex) { if constexpr (TRank == core::TopNRowNumberNode::RankFunction::kRowNumber) { nextRank_ -= 1; @@ -587,21 +587,19 @@ void TopNRowNumber::computeNextRankInMemory( // This is the logic for rank() and dense_rank(). // If the next row is a peer of the current one, then the rank remains the - // same, but the number of peers is incremented. + // same. if (comparator_.compare(outputRows_[outputIndex], partition.rows.top()) == 0) { - numPeers_ += 1; return; } // The new row is not a peer of the current one. So dense_rank drops the - // rank by 1, but rank drops it by the number of peers (which is then - // reset). + // rank by 1, but rank drops by the number of peers of the new top + // row (new rank) in TopRows queue. if constexpr (TRank == core::TopNRowNumberNode::RankFunction::kDenseRank) { nextRank_ -= 1; } else { - nextRank_ -= numPeers_; - numPeers_ = 1; + nextRank_ -= partition.numTopRankRows(); } } @@ -1111,18 +1109,32 @@ char* TopNRowNumber::TopRows::removeTopRankRows() { vector_size_t TopNRowNumber::TopRows::numTopRankRows() { VELOX_CHECK(!rows.empty()); + + tempTopRankRows.clear(); + SCOPE_EXIT { + tempTopRankRows.clear(); + }; + auto popAndSaveTopRow = [&]() { + tempTopRankRows.push_back(rows.top()); + rows.pop(); + }; + char* topRow = rows.top(); - vector_size_t numRows = 0; - const std::vector> partitionRowsVector = - PriorityQueueVector(rows); - for (const char* row : partitionRowsVector) { - if (rowComparator.compare(topRow, row) == 0) { - numRows += 1; + popAndSaveTopRow(); + while (!rows.empty()) { + if (rowComparator.compare(topRow, rows.top()) == 0) { + popAndSaveTopRow(); } else { break; } } - return numRows; + + vector_size_t numTopRows = tempTopRankRows.size(); + // Re-insert all rows with the top rank row. + for (char* row : tempTopRankRows) { + rows.push(row); + } + return numTopRows; } bool TopNRowNumber::TopRows::isDuplicate( diff --git a/velox/exec/TopNRowNumber.h b/velox/exec/TopNRowNumber.h index 64986943967..6ca70136939 100644 --- a/velox/exec/TopNRowNumber.h +++ b/velox/exec/TopNRowNumber.h @@ -138,6 +138,9 @@ class TopNRowNumber : public Operator { std::priority_queue>, Compare> rows; + // Temporary storage for rows with the highest rank in the partition. + std::vector> tempTopRankRows; + RowComparator& rowComparator; // This is the greatest rank seen so far in the input rows. Note: rank is @@ -161,6 +164,7 @@ class TopNRowNumber : public Operator { TopRows(HashStringAllocator* allocator, RowComparator& comparator) : rows{{comparator}, StlAllocator(allocator)}, + tempTopRankRows(StlAllocator(allocator)), rowComparator(comparator) {} }; @@ -209,9 +213,7 @@ class TopNRowNumber : public Operator { // Computes the rank for the next row to be output // (all output rows in memory). template - void computeNextRankInMemory( - const TopRows& partition, - vector_size_t rowIndex); + void computeNextRankInMemory(TopRows& partition, vector_size_t rowIndex); // Appends numRows of the current partition to the output. Note: The rows are // popped in reverse order of the rank. diff --git a/velox/exec/tests/TopNRowNumberTest.cpp b/velox/exec/tests/TopNRowNumberTest.cpp index 7b21a3dee4b..55a140692b3 100644 --- a/velox/exec/tests/TopNRowNumberTest.cpp +++ b/velox/exec/tests/TopNRowNumberTest.cpp @@ -107,6 +107,66 @@ TEST_P(MultiTopNRowNumberTest, basic) { testLimit(5); } +TEST_P(MultiTopNRowNumberTest, basicWithPeers) { + auto data = makeRowVector({ + // Partitioning key. + makeFlatVector({1, 1, 2, 2, 1, 2, 1, 1, 1, 1, 1}), + // Sorting key. + makeFlatVector({33, 11, 55, 44, 11, 22, 11, 11, 11, 33, 33}), + // Data. Mapping data to matching sorting keys to avoid ordering issues. + makeFlatVector({10, 50, 30, 40, 50, 60, 50, 50, 50, 10, 10}), + }); + + createDuckDbTable({data}); + + auto testLimit = [&](auto limit) { + // Emit row numbers. + auto plan = PlanBuilder() + .values({data}) + .topNRank(functionName_, {"c0"}, {"c1"}, limit, true) + .planNode(); + assertQuery( + plan, + fmt::format( + "SELECT * FROM (SELECT *, {}() over (partition by c0 order by c1) as rn FROM tmp) " + " WHERE rn <= {}", + functionName_, + limit)); + + // Do not emit row numbers. + plan = PlanBuilder() + .values({data}) + .topNRank(functionName_, {"c0"}, {"c1"}, limit, false) + .planNode(); + + assertQuery( + plan, + fmt::format( + "SELECT c0, c1, c2 FROM (SELECT *, {}() over (partition by c0 order by c1) as rn FROM tmp) " + " WHERE rn <= {}", + functionName_, + limit)); + + // No partitioning keys. + plan = PlanBuilder() + .values({data}) + .topNRank(functionName_, {}, {"c1"}, limit, true) + .planNode(); + assertQuery( + plan, + fmt::format( + "SELECT * FROM (SELECT *, {}() over (order by c1) as rn FROM tmp) " + " WHERE rn <= {}", + functionName_, + limit)); + }; + + testLimit(1); + testLimit(2); + testLimit(3); + testLimit(5); +} + TEST_P(MultiTopNRowNumberTest, largeOutput) { // Make 10 vectors. Use different types for partitioning key, sorting key and // data. Use order of columns different from partitioning keys, followed by