Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 66 additions & 30 deletions velox/exec/TopNRowNumber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,10 @@ void TopNRowNumber::addInput(RowVectorPtr input) {
// Otherwise, check if row should replace an existing row or be discarded.
processInputRowLoop(numInput);

// It is determined that the TopNRowNumber (as a partial) is not rejecting
// enough input rows to make the duplicate detection worthwhile. Hence,
// abandon the processing at this partial TopN and let the final TopN do
// the processing.
if (abandonPartialEarly()) {
abandonedPartial_ = true;
addRuntimeStat("abandonedPartial", RuntimeCounter(1));
Expand Down Expand Up @@ -356,9 +360,14 @@ void TopNRowNumber::updateEstimatedOutputRowSize() {
}

TopNRowNumber::TopRows* TopNRowNumber::nextPartition() {
auto setNextRank = [&](const TopRows& partition) {
nextRank_ = partition.topRank;
};

if (!table_) {
if (!outputPartitionNumber_) {
outputPartitionNumber_ = 0;
setNextRank(*singlePartition_);
return singlePartition_.get();
}
return nullptr;
Expand All @@ -384,7 +393,15 @@ TopNRowNumber::TopRows* TopNRowNumber::nextPartition() {
}
}

return &partitionAt(partitions_[outputPartitionNumber_.value()]);
auto partition = &partitionAt(partitions_[outputPartitionNumber_.value()]);
setNextRank(*partition);
return partition;
}

void TopNRowNumber::computeNextRankInMemory(
const TopRows& /*partition*/,
vector_size_t /*rowIndex*/) {
nextRank_ -= 1;
}

void TopNRowNumber::appendPartitionRows(
Expand All @@ -398,10 +415,13 @@ void TopNRowNumber::appendPartitionRows(
for (auto i = 0; i < numRows; ++i) {
const auto index = outputOffset + i;
if (rowNumbers) {
rowNumbers->set(index, rowNumber--);
rowNumbers->set(index, nextRank_);
}
outputRows_[index] = partition.rows.top();
partition.rows.pop();
if (!partition.rows.empty()) {
computeNextRankInMemory(partition, index);
}
}
}

Expand All @@ -417,7 +437,7 @@ RowVectorPtr TopNRowNumber::getOutput() {
return output;
}

// We may have input accumulated in 'data_'.
// There could be older rows accumulated in 'data_'.
if (data_->numRows() > 0) {
return getOutputFromMemory();
}
Expand All @@ -426,13 +446,16 @@ RowVectorPtr TopNRowNumber::getOutput() {
finished_ = true;
}

// There is no data to return at this moment.
return nullptr;
}

if (!noMoreInput_) {
return nullptr;
}

// All the input data is received, so the operator can start producing
// output.
RowVectorPtr output;
if (merge_ != nullptr) {
output = getOutputFromSpill();
Expand Down Expand Up @@ -512,13 +535,15 @@ RowVectorPtr TopNRowNumber::getOutputFromMemory() {
return output;
}

bool TopNRowNumber::isNewPartition(
bool TopNRowNumber::compareSpillRowColumns(
const RowVectorPtr& output,
vector_size_t index,
SpillMergeStream* next) {
const SpillMergeStream* next,
vector_size_t startColumn,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we pass startColumn and endColumn but not 0, numPartitionKeys_? And why call this as compareSpillRowColumns?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xiaoxmeng : This code will be enhanced to do the topn optimization for rank functions as well... and those require to compare the rows on the order_by columns. So I abstracted the function for compareSpillRowColumns which will be reused to compare the order by columns as well.

vector_size_t endColumn) {
VELOX_CHECK_GT(index, 0);

for (auto i = 0; i < numPartitionKeys_; ++i) {
for (auto i = startColumn; i < endColumn; ++i) {
if (!output->childAt(inputChannels_[i])
->equalValueAt(
next->current().childAt(i).get(),
Expand All @@ -530,37 +555,53 @@ bool TopNRowNumber::isNewPartition(
return false;
}

void TopNRowNumber::setupNextOutput(
// Compares the partition keys for new partitions.
bool TopNRowNumber::isNewPartition(
Comment thread
xiaoxmeng marked this conversation as resolved.
const RowVectorPtr& output,
int32_t rowNumber) {
auto* lookAhead = merge_->next();
if (lookAhead == nullptr) {
nextRowNumber_ = 0;
vector_size_t index,
const SpillMergeStream* next) {
return compareSpillRowColumns(output, index, next, 0, numPartitionKeys_);
}

void TopNRowNumber::computeNextRankFromSpill(
const RowVectorPtr& output,
vector_size_t index,
const SpillMergeStream* next) {
if (isNewPartition(output, index, next)) {
nextRank_ = 1;
return;
}

if (isNewPartition(output, output->size(), lookAhead)) {
nextRowNumber_ = 0;
nextRank_ += 1;
return;
}

void TopNRowNumber::setupNextOutput(const RowVectorPtr& output) {
auto resetNextRank = [this]() { nextRank_ = 1; };
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why you reset the nextRank_ = 1 and use <= to replace < before?

Copy link
Copy Markdown
Collaborator Author

@aditi-pandit aditi-pandit Jun 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jinchengchenghh I will be updating this logic for both rank and dense_rank computation in the next PR.

In rank and dense_rank the result value changes only if order by keys are different between current row and next. By setting starting rank as 0 and then incrementing in place if different from adjacent row is complicated logic...Its simpler to set current rank as 1 and increment it when advancing to the next row. So I changed how the result value is computed.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not too familiar with this operator.
We can use 1 because we have at least one row result (if it doesn't match any other row) that has rank/row_number 1.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@czentgr : Yes.


auto* lookAhead = merge_->next();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's just call this next?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The loop below uses the next variable naming at line 595, so I stuck with the original lookAhead name here.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean let's just use next why have two variables: next and lookAhead?

if (lookAhead == nullptr) {
resetNextRank();
return;
}

nextRowNumber_ = rowNumber;
if (nextRowNumber_ < limit_) {
computeNextRankFromSpill(output, output->size(), lookAhead);
if (nextRank_ <= limit_) {
return;
}

// Skip remaining rows for this partition.
lookAhead->pop();
while (auto* next = merge_->next()) {
if (isNewPartition(output, output->size(), next)) {
nextRowNumber_ = 0;
resetNextRank();
return;
}
next->pop();
}

// This partition is the last partition.
nextRowNumber_ = 0;
resetNextRank();
}

RowVectorPtr TopNRowNumber::getOutputFromSpill() {
Expand All @@ -570,7 +611,7 @@ RowVectorPtr TopNRowNumber::getOutputFromSpill() {
// All rows from the same partition will appear together.
// We'll identify partition boundaries by comparing partition keys of the
// current row with the previous row. When new partition starts, we'll reset
// row number to zero. Once row number reaches the 'limit_', we'll start
// nextRank_ to zero. Once rank reaches the 'limit_', we'll start
// dropping rows until the next partition starts.
// We'll emit output every time we accumulate 'outputBatchSize_' rows.

Expand All @@ -583,24 +624,20 @@ RowVectorPtr TopNRowNumber::getOutputFromSpill() {

// Index of the next row to append to output.
vector_size_t index = 0;

// Row number of the next row in the current partition.
vector_size_t rowNumber = nextRowNumber_;
VELOX_CHECK_LT(rowNumber, limit_);
VELOX_CHECK_LE(nextRank_, limit_);
for (;;) {
auto next = merge_->next();
if (next == nullptr) {
break;
}

// Check if this row comes from a new partition.
if (index > 0 && isNewPartition(output, index, next)) {
rowNumber = 0;
if (index > 0) {
computeNextRankFromSpill(output, index, next);
}

// Copy this row to the output buffer if this partition has
// < limit_ rows output.
if (rowNumber < limit_) {
if (nextRank_ <= limit_) {
for (auto i = 0; i < inputChannels_.size(); ++i) {
output->childAt(inputChannels_[i])
->copy(
Expand All @@ -611,10 +648,9 @@ RowVectorPtr TopNRowNumber::getOutputFromSpill() {
}
if (rowNumbers) {
// Row numbers start with 1.
rowNumbers->set(index, rowNumber + 1);
rowNumbers->set(index, nextRank_);
}
++index;
++rowNumber;
}

// Pop this row from the spill.
Expand All @@ -625,8 +661,8 @@ RowVectorPtr TopNRowNumber::getOutputFromSpill() {
// Prepare the next batch :
// i) If 'limit_' is reached for this partition, then skip the rows
// until the next partition.
// ii) If the next row is from a new partition, then reset rowNumber_.
setupNextOutput(output, rowNumber);
// ii) If the next row is from a new partition, then reset nextRank_.
setupNextOutput(output);
return output;
}
}
Expand Down
42 changes: 32 additions & 10 deletions velox/exec/TopNRowNumber.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ class TopNRowNumber : public Operator {
// partitions left.
TopRows* nextPartition();

// Computes the rank for the next row to be output
// (all output rows in memory).
void computeNextRankInMemory(
const TopRows& partition,
vector_size_t rowIndex);

// Appends numRows of the output partition the output. Note: The rows are
// popped in reverse order of the row_number.
// NOTE: This function erases the yielded output rows from the partition
Expand Down Expand Up @@ -150,17 +156,32 @@ class TopNRowNumber : public Operator {
bool isNewPartition(
const RowVectorPtr& output,
vector_size_t index,
SpillMergeStream* next);
const SpillMergeStream* next);

// Utility method to compare values from startColumn to endColumn for
// 'next' row from SpillMergeStream with current row of output (at index).
bool compareSpillRowColumns(
const RowVectorPtr& output,
vector_size_t index,
const SpillMergeStream* next,
vector_size_t startColumn,
vector_size_t endColumn);

// Computes next rank value for spill output.
void computeNextRankFromSpill(
const RowVectorPtr& output,
vector_size_t index,
const SpillMergeStream* next);

// Sets nextRowNumber_ to rowNumber. Checks if next row in 'merge_' belongs to
// a different partition than last row in 'output' and if so updates
// nextRowNumber_ to 0. Also, checks current partition reached the limit on
// number of rows and if so advances 'merge_' to the first row on the next
// partition and sets nextRowNumber_ to 0.
// Checks if next row in 'merge_' belongs to a different partition than last
// row in 'output' and if so updates nextRank_ to 1.
// Also, checks current partition reached the limit on number of rows and
// if so advances 'merge_' to the first row on the next
// partition and sets nextRank_ to 1.
//
// @post 'merge_->next()' is either at end or points to a row that should be
// included in the next output batch using 'nextRowNumber_'.
void setupNextOutput(const RowVectorPtr& output, int32_t rowNumber);
// included in the next output batch using 'nextRank_'.
void setupNextOutput(const RowVectorPtr& output);

// Called in noMoreInput() and spill().
void updateEstimatedOutputRowSize();
Expand Down Expand Up @@ -260,7 +281,8 @@ class TopNRowNumber : public Operator {
// Used to sort-merge spilled data.
std::unique_ptr<TreeOfLosers<SpillMergeStream>> merge_;

// Row number for the first row in the next output batch from the spiller.
int32_t nextRowNumber_{0};
// Row number (or rank or dense_rank in the future) for the next row being
// output from memory or the spiller.
vector_size_t nextRank_{1};
};
} // namespace facebook::velox::exec
Loading