Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 26 additions & 14 deletions velox/exec/TopNRowNumber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@

template <core::TopNRowNumberNode::RankFunction TRank>
void TopNRowNumber::computeNextRankInMemory(
const TopRows& partition,
TopRows& partition,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why can't this be const anymore? I don't see where you modify this below.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function now uses partition.numTopRankRows() which needs non const variable.

Ideally we should be able to do this logic without mutable structures, but priority_queue doesn't really offer methods beyond top of queue member access. So the only option was to call pop() and check top() and then reinsert elements back.

vector_size_t outputIndex) {
if constexpr (TRank == core::TopNRowNumberNode::RankFunction::kRowNumber) {
nextRank_ -= 1;
Expand All @@ -587,21 +587,19 @@

// This is the logic for rank() and dense_rank().
// If the next row is a peer of the current one, then the rank remains the
// same, but the number of peers is incremented.
// same.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the peer counting has bug? Why we can't use the current tracking and it seems that numTopRankRows is a bit expensive than explicitly tracking the peers? thanks!

Copy link
Copy Markdown
Collaborator Author

@aditi-pandit aditi-pandit Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xiaoxmeng :
The peer counting bug is the fix at line 602.

To give an example:
So say the input had sort keys : 1, 2, 2, 3.
The ranks for these should be: 1, 2, 2, 4

When emitting output rows, we are going from top of the queue to lower. We know that the top rank found so far is 4.
So first row output is (3, 4). Peers of 3 = 1

We need to compute the rank of the new top of the queue (which is 2). Since 2 != 3 we know its a new rank value. All rows with sort key 2 will have the same rank, and that rank value its incremented by the number of peers when computing the rank of the current row which is 4. As there are 2 peers of 2 then their rank should be 4 - 2 = 2.
So next output is (2, 2)

The new row at top of queue is 2. As its not a new sort key the rank remains the same. So next output is (2, 2). Peers of 2 = 2

The new top of queue is 1. As 1 != 2 then its a new rank. The new rank is incremented by the number of its peers to obtain the current rank 2. As number of peers is 1, so new rank is 1.

If we were counting peers of current row (old code) then we would output
1, 3, 3, 4 which is incorrect.

We can't keep all output rows until we see next peer value when outputting, so finding number of top rank rows is the best approach.

We end up doing the numTopRankRows only once for each distinct sort key (not each row) so its not very expensive actually.

if (comparator_.compare(outputRows_[outputIndex], partition.rows.top()) ==
0) {
numPeers_ += 1;
return;
}

// The new row is not a peer of the current one. So dense_rank drops the
// rank by 1, but rank drops it by the number of peers (which is then
// reset).
// rank by 1, but rank drops by the number of peers of the new top
// row (new rank) in TopRows queue.
if constexpr (TRank == core::TopNRowNumberNode::RankFunction::kDenseRank) {
nextRank_ -= 1;
} else {
nextRank_ -= numPeers_;
numPeers_ = 1;
nextRank_ -= partition.numTopRankRows();
}
}

Expand Down Expand Up @@ -1111,18 +1109,32 @@

vector_size_t TopNRowNumber::TopRows::numTopRankRows() {
VELOX_CHECK(!rows.empty());

tempTopRankRows.clear();
Comment thread
xiaoxmeng marked this conversation as resolved.
SCOPE_EXIT {
tempTopRankRows.clear();
};
auto popAndSaveTopRow = [&]() {
tempTopRankRows.push_back(rows.top());
rows.pop();
};
Comment on lines +1117 to +1120
Copy link

Copilot AI Feb 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lambda popAndSaveTopRow performs a simple two-line operation that is only called twice in close proximity. Inlining these operations directly would improve code readability and reduce unnecessary abstraction.

Copilot uses AI. Check for mistakes.

Comment thread
aditi-pandit marked this conversation as resolved.
char* topRow = rows.top();
vector_size_t numRows = 0;
const std::vector<char*, StlAllocator<char*>> partitionRowsVector =
PriorityQueueVector(rows);
for (const char* row : partitionRowsVector) {
if (rowComparator.compare(topRow, row) == 0) {
numRows += 1;
popAndSaveTopRow();
while (!rows.empty()) {
if (rowComparator.compare(topRow, rows.top()) == 0) {
popAndSaveTopRow();
} else {
break;
}
}
return numRows;

vector_size_t numTopRows = tempTopRankRows.size();

Check warning on line 1132 in velox/exec/TopNRowNumber.cpp

View workflow job for this annotation

GitHub Actions / Build with GCC / Linux release with adapters

bugprone-narrowing-conversions

narrowing conversion from 'size_type' (aka 'unsigned long') to signed type 'vector_size_t' (aka 'int') is implementation-defined

Check warning on line 1132 in velox/exec/TopNRowNumber.cpp

View workflow job for this annotation

GitHub Actions / Build with GCC / Linux release with adapters

misc-const-correctness

variable 'numTopRows' of type 'vector_size_t' (aka 'int') can be declared 'const'
// Re-insert all rows with the top rank row.
for (char* row : tempTopRankRows) {
rows.push(row);
}
return numTopRows;
}

bool TopNRowNumber::TopRows::isDuplicate(
Expand Down
8 changes: 5 additions & 3 deletions velox/exec/TopNRowNumber.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ class TopNRowNumber : public Operator {
std::priority_queue<char*, std::vector<char*, StlAllocator<char*>>, Compare>
rows;

// Temporary storage for rows with the highest rank in the partition.
std::vector<char*, StlAllocator<char*>> tempTopRankRows;

RowComparator& rowComparator;

// This is the greatest rank seen so far in the input rows. Note: rank is
Expand All @@ -161,6 +164,7 @@ class TopNRowNumber : public Operator {

TopRows(HashStringAllocator* allocator, RowComparator& comparator)
: rows{{comparator}, StlAllocator<char*>(allocator)},
tempTopRankRows(StlAllocator<char*>(allocator)),
rowComparator(comparator) {}
};

Expand Down Expand Up @@ -209,9 +213,7 @@ class TopNRowNumber : public Operator {
// Computes the rank for the next row to be output
// (all output rows in memory).
template <core::TopNRowNumberNode::RankFunction TRank>
void computeNextRankInMemory(
const TopRows& partition,
vector_size_t rowIndex);
void computeNextRankInMemory(TopRows& partition, vector_size_t rowIndex);

// Appends numRows of the current partition to the output. Note: The rows are
// popped in reverse order of the rank.
Expand Down
60 changes: 60 additions & 0 deletions velox/exec/tests/TopNRowNumberTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,66 @@ TEST_P(MultiTopNRowNumberTest, basic) {
testLimit(5);
}

TEST_P(MultiTopNRowNumberTest, basicWithPeers) {
auto data = makeRowVector({
// Partitioning key.
makeFlatVector<int64_t>({1, 1, 2, 2, 1, 2, 1, 1, 1, 1, 1}),
// Sorting key.
makeFlatVector<int64_t>({33, 11, 55, 44, 11, 22, 11, 11, 11, 33, 33}),
// Data. Mapping data to matching sorting keys to avoid ordering issues.
makeFlatVector<int64_t>({10, 50, 30, 40, 50, 60, 50, 50, 50, 10, 10}),
});

createDuckDbTable({data});

auto testLimit = [&](auto limit) {
// Emit row numbers.
auto plan = PlanBuilder()
.values({data})
.topNRank(functionName_, {"c0"}, {"c1"}, limit, true)
.planNode();
assertQuery(
plan,
fmt::format(
"SELECT * FROM (SELECT *, {}() over (partition by c0 order by c1) as rn FROM tmp) "
" WHERE rn <= {}",
functionName_,
limit));

// Do not emit row numbers.
plan = PlanBuilder()
.values({data})
.topNRank(functionName_, {"c0"}, {"c1"}, limit, false)
.planNode();

assertQuery(
plan,
fmt::format(
"SELECT c0, c1, c2 FROM (SELECT *, {}() over (partition by c0 order by c1) as rn FROM tmp) "
" WHERE rn <= {}",
functionName_,
limit));

// No partitioning keys.
plan = PlanBuilder()
.values({data})
.topNRank(functionName_, {}, {"c1"}, limit, true)
.planNode();
assertQuery(
plan,
fmt::format(
"SELECT * FROM (SELECT *, {}() over (order by c1) as rn FROM tmp) "
" WHERE rn <= {}",
functionName_,
limit));
};

testLimit(1);
testLimit(2);
testLimit(3);
testLimit(5);
}

TEST_P(MultiTopNRowNumberTest, largeOutput) {
// Make 10 vectors. Use different types for partitioning key, sorting key and
// data. Use order of columns different from partitioning keys, followed by
Expand Down
Loading