diff --git a/velox/exec/TopNRowNumber.cpp b/velox/exec/TopNRowNumber.cpp index 42202e2e78a..61cda574177 100644 --- a/velox/exec/TopNRowNumber.cpp +++ b/velox/exec/TopNRowNumber.cpp @@ -205,13 +205,10 @@ void TopNRowNumber::addInput(RowVectorPtr input) { // Initialize new partitions. initializeNewPartitions(); - // Process input rows. For each row, lookup the partition. If number of rows - // in that partition is less than limit, add the new row. Otherwise, check - // if row should replace an existing row or be discarded. - for (auto i = 0; i < numInput; ++i) { - auto& partition = partitionAt(lookup_->hits[i]); - processInputRow(i, partition); - } + // Process input rows. For each row, lookup the partition. If the highest + // (top) rank in that partition is less than limit, add the new row. + // Otherwise, check if row should replace an existing row or be discarded. + processInputRowLoop(numInput); if (abandonPartialEarly()) { abandonedPartial_ = true; @@ -222,9 +219,7 @@ void TopNRowNumber::addInput(RowVectorPtr input) { outputRows_.resize(outputBatchSize_); } } else { - for (auto i = 0; i < numInput; ++i) { - processInputRow(i, *singlePartition_); - } + processInputRowLoop(numInput); } } @@ -304,6 +299,18 @@ void TopNRowNumber::processInputRow(vector_size_t index, TopRows& partition) { topRows.push(newRow); } +void TopNRowNumber::processInputRowLoop(vector_size_t numInput) { + if (table_) { + for (auto i = 0; i < numInput; ++i) { + processInputRow(i, partitionAt(lookup_->hits[i])); + } + } else { + for (auto i = 0; i < numInput; ++i) { + processInputRow(i, *singlePartition_); + } + } +} + void TopNRowNumber::noMoreInput() { Operator::noMoreInput(); diff --git a/velox/exec/TopNRowNumber.h b/velox/exec/TopNRowNumber.h index bf20668e630..dc21f0f93c4 100644 --- a/velox/exec/TopNRowNumber.h +++ b/velox/exec/TopNRowNumber.h @@ -110,6 +110,9 @@ class TopNRowNumber : public Operator { // Returns a pointer to the row to add to the partition accumulator. char* processRowExceedingLimit(vector_size_t index, TopRows& partition); + // Loop to add each row to a partition or discard the row. + void processInputRowLoop(vector_size_t numInput); + // Adds input row to a partition or discards the row. void processInputRow(vector_size_t index, TopRows& partition);