From cb72f94ae73d69d72c37685e73770e2c26368dbb Mon Sep 17 00:00:00 2001 From: aditi-pandit Date: Mon, 23 Jun 2025 22:36:47 -0700 Subject: [PATCH] refactor: Abstract computeRankInMemory and computeRankInSpill methods for TopNRowNumber output loop --- velox/exec/TopNRowNumber.cpp | 96 +++++++++++++++++++++++++----------- velox/exec/TopNRowNumber.h | 42 ++++++++++++---- 2 files changed, 98 insertions(+), 40 deletions(-) diff --git a/velox/exec/TopNRowNumber.cpp b/velox/exec/TopNRowNumber.cpp index 5bd614ffa1e..10e4b7f4504 100644 --- a/velox/exec/TopNRowNumber.cpp +++ b/velox/exec/TopNRowNumber.cpp @@ -213,6 +213,10 @@ void TopNRowNumber::addInput(RowVectorPtr input) { // Otherwise, check if row should replace an existing row or be discarded. processInputRowLoop(numInput); + // It is determined that the TopNRowNumber (as a partial) is not rejecting + // enough input rows to make the duplicate detection worthwhile. Hence, + // abandon the processing at this partial TopN and let the final TopN do + // the processing. if (abandonPartialEarly()) { abandonedPartial_ = true; addRuntimeStat("abandonedPartial", RuntimeCounter(1)); @@ -356,9 +360,14 @@ void TopNRowNumber::updateEstimatedOutputRowSize() { } TopNRowNumber::TopRows* TopNRowNumber::nextPartition() { + auto setNextRank = [&](const TopRows& partition) { + nextRank_ = partition.topRank; + }; + if (!table_) { if (!outputPartitionNumber_) { outputPartitionNumber_ = 0; + setNextRank(*singlePartition_); return singlePartition_.get(); } return nullptr; @@ -384,7 +393,15 @@ TopNRowNumber::TopRows* TopNRowNumber::nextPartition() { } } - return &partitionAt(partitions_[outputPartitionNumber_.value()]); + auto partition = &partitionAt(partitions_[outputPartitionNumber_.value()]); + setNextRank(*partition); + return partition; +} + +void TopNRowNumber::computeNextRankInMemory( + const TopRows& /*partition*/, + vector_size_t /*rowIndex*/) { + nextRank_ -= 1; } void TopNRowNumber::appendPartitionRows( @@ -398,10 +415,13 @@ void TopNRowNumber::appendPartitionRows( for (auto i = 0; i < numRows; ++i) { const auto index = outputOffset + i; if (rowNumbers) { - rowNumbers->set(index, rowNumber--); + rowNumbers->set(index, nextRank_); } outputRows_[index] = partition.rows.top(); partition.rows.pop(); + if (!partition.rows.empty()) { + computeNextRankInMemory(partition, index); + } } } @@ -417,7 +437,7 @@ RowVectorPtr TopNRowNumber::getOutput() { return output; } - // We may have input accumulated in 'data_'. + // There could be older rows accumulated in 'data_'. if (data_->numRows() > 0) { return getOutputFromMemory(); } @@ -426,6 +446,7 @@ RowVectorPtr TopNRowNumber::getOutput() { finished_ = true; } + // There is no data to return at this moment. return nullptr; } @@ -433,6 +454,8 @@ RowVectorPtr TopNRowNumber::getOutput() { return nullptr; } + // All the input data is received, so the operator can start producing + // output. RowVectorPtr output; if (merge_ != nullptr) { output = getOutputFromSpill(); @@ -512,13 +535,15 @@ RowVectorPtr TopNRowNumber::getOutputFromMemory() { return output; } -bool TopNRowNumber::isNewPartition( +bool TopNRowNumber::compareSpillRowColumns( const RowVectorPtr& output, vector_size_t index, - SpillMergeStream* next) { + const SpillMergeStream* next, + vector_size_t startColumn, + vector_size_t endColumn) { VELOX_CHECK_GT(index, 0); - for (auto i = 0; i < numPartitionKeys_; ++i) { + for (auto i = startColumn; i < endColumn; ++i) { if (!output->childAt(inputChannels_[i]) ->equalValueAt( next->current().childAt(i).get(), @@ -530,22 +555,38 @@ bool TopNRowNumber::isNewPartition( return false; } -void TopNRowNumber::setupNextOutput( +// Compares the partition keys for new partitions. +bool TopNRowNumber::isNewPartition( const RowVectorPtr& output, - int32_t rowNumber) { - auto* lookAhead = merge_->next(); - if (lookAhead == nullptr) { - nextRowNumber_ = 0; + vector_size_t index, + const SpillMergeStream* next) { + return compareSpillRowColumns(output, index, next, 0, numPartitionKeys_); +} + +void TopNRowNumber::computeNextRankFromSpill( + const RowVectorPtr& output, + vector_size_t index, + const SpillMergeStream* next) { + if (isNewPartition(output, index, next)) { + nextRank_ = 1; return; } - if (isNewPartition(output, output->size(), lookAhead)) { - nextRowNumber_ = 0; + nextRank_ += 1; + return; +} + +void TopNRowNumber::setupNextOutput(const RowVectorPtr& output) { + auto resetNextRank = [this]() { nextRank_ = 1; }; + + auto* lookAhead = merge_->next(); + if (lookAhead == nullptr) { + resetNextRank(); return; } - nextRowNumber_ = rowNumber; - if (nextRowNumber_ < limit_) { + computeNextRankFromSpill(output, output->size(), lookAhead); + if (nextRank_ <= limit_) { return; } @@ -553,14 +594,14 @@ void TopNRowNumber::setupNextOutput( lookAhead->pop(); while (auto* next = merge_->next()) { if (isNewPartition(output, output->size(), next)) { - nextRowNumber_ = 0; + resetNextRank(); return; } next->pop(); } // This partition is the last partition. - nextRowNumber_ = 0; + resetNextRank(); } RowVectorPtr TopNRowNumber::getOutputFromSpill() { @@ -570,7 +611,7 @@ RowVectorPtr TopNRowNumber::getOutputFromSpill() { // All rows from the same partition will appear together. // We'll identify partition boundaries by comparing partition keys of the // current row with the previous row. When new partition starts, we'll reset - // row number to zero. Once row number reaches the 'limit_', we'll start + // nextRank_ to zero. Once rank reaches the 'limit_', we'll start // dropping rows until the next partition starts. // We'll emit output every time we accumulate 'outputBatchSize_' rows. @@ -583,24 +624,20 @@ RowVectorPtr TopNRowNumber::getOutputFromSpill() { // Index of the next row to append to output. vector_size_t index = 0; - - // Row number of the next row in the current partition. - vector_size_t rowNumber = nextRowNumber_; - VELOX_CHECK_LT(rowNumber, limit_); + VELOX_CHECK_LE(nextRank_, limit_); for (;;) { auto next = merge_->next(); if (next == nullptr) { break; } - // Check if this row comes from a new partition. - if (index > 0 && isNewPartition(output, index, next)) { - rowNumber = 0; + if (index > 0) { + computeNextRankFromSpill(output, index, next); } // Copy this row to the output buffer if this partition has // < limit_ rows output. - if (rowNumber < limit_) { + if (nextRank_ <= limit_) { for (auto i = 0; i < inputChannels_.size(); ++i) { output->childAt(inputChannels_[i]) ->copy( @@ -611,10 +648,9 @@ RowVectorPtr TopNRowNumber::getOutputFromSpill() { } if (rowNumbers) { // Row numbers start with 1. - rowNumbers->set(index, rowNumber + 1); + rowNumbers->set(index, nextRank_); } ++index; - ++rowNumber; } // Pop this row from the spill. @@ -625,8 +661,8 @@ RowVectorPtr TopNRowNumber::getOutputFromSpill() { // Prepare the next batch : // i) If 'limit_' is reached for this partition, then skip the rows // until the next partition. - // ii) If the next row is from a new partition, then reset rowNumber_. - setupNextOutput(output, rowNumber); + // ii) If the next row is from a new partition, then reset nextRank_. + setupNextOutput(output); return output; } } diff --git a/velox/exec/TopNRowNumber.h b/velox/exec/TopNRowNumber.h index dc21f0f93c4..6df6a910cd7 100644 --- a/velox/exec/TopNRowNumber.h +++ b/velox/exec/TopNRowNumber.h @@ -120,6 +120,12 @@ class TopNRowNumber : public Operator { // partitions left. TopRows* nextPartition(); + // Computes the rank for the next row to be output + // (all output rows in memory). + void computeNextRankInMemory( + const TopRows& partition, + vector_size_t rowIndex); + // Appends numRows of the output partition the output. Note: The rows are // popped in reverse order of the row_number. // NOTE: This function erases the yielded output rows from the partition @@ -150,17 +156,32 @@ class TopNRowNumber : public Operator { bool isNewPartition( const RowVectorPtr& output, vector_size_t index, - SpillMergeStream* next); + const SpillMergeStream* next); + + // Utility method to compare values from startColumn to endColumn for + // 'next' row from SpillMergeStream with current row of output (at index). + bool compareSpillRowColumns( + const RowVectorPtr& output, + vector_size_t index, + const SpillMergeStream* next, + vector_size_t startColumn, + vector_size_t endColumn); + + // Computes next rank value for spill output. + void computeNextRankFromSpill( + const RowVectorPtr& output, + vector_size_t index, + const SpillMergeStream* next); - // Sets nextRowNumber_ to rowNumber. Checks if next row in 'merge_' belongs to - // a different partition than last row in 'output' and if so updates - // nextRowNumber_ to 0. Also, checks current partition reached the limit on - // number of rows and if so advances 'merge_' to the first row on the next - // partition and sets nextRowNumber_ to 0. + // Checks if next row in 'merge_' belongs to a different partition than last + // row in 'output' and if so updates nextRank_ to 1. + // Also, checks current partition reached the limit on number of rows and + // if so advances 'merge_' to the first row on the next + // partition and sets nextRank_ to 1. // // @post 'merge_->next()' is either at end or points to a row that should be - // included in the next output batch using 'nextRowNumber_'. - void setupNextOutput(const RowVectorPtr& output, int32_t rowNumber); + // included in the next output batch using 'nextRank_'. + void setupNextOutput(const RowVectorPtr& output); // Called in noMoreInput() and spill(). void updateEstimatedOutputRowSize(); @@ -260,7 +281,8 @@ class TopNRowNumber : public Operator { // Used to sort-merge spilled data. std::unique_ptr> merge_; - // Row number for the first row in the next output batch from the spiller. - int32_t nextRowNumber_{0}; + // Row number (or rank or dense_rank in the future) for the next row being + // output from memory or the spiller. + vector_size_t nextRank_{1}; }; } // namespace facebook::velox::exec