diff --git a/velox/exec/NestedLoopJoinProbe.cpp b/velox/exec/NestedLoopJoinProbe.cpp index f84d0484d1f..8a882478be7 100644 --- a/velox/exec/NestedLoopJoinProbe.cpp +++ b/velox/exec/NestedLoopJoinProbe.cpp @@ -261,14 +261,36 @@ RowVectorPtr NestedLoopJoinProbe::generateOutput() { // Try to advance the probe cursor; call finish if no more probe input. if (advanceProbe()) { finishProbeInput(); + if (numOutputRows_ == 0) { + // output_ can only be re-used across probe rows within the same input_. + // Here we have to abandon the emtpy output_ with memory allocated. + output_ = nullptr; + } } - if (output_ != nullptr && output_->size() == 0) { - output_ = nullptr; + if (!readyToProduceOutput()) { + return nullptr; } + + output_->resize(numOutputRows_); return std::move(output_); } +bool NestedLoopJoinProbe::readyToProduceOutput() { + if (!output_ || numOutputRows_ == 0) { + return false; + } + + // For cross join, output is produced directly + if (isCrossJoin()) { + return true; + } + + // if the input_ has no remaining rows or the output_ is fully filled, + // it's right time for output. + return !input_ || numOutputRows_ >= outputBatchSize_; +} + bool NestedLoopJoinProbe::advanceProbe() { if (hasProbedAllBuildData()) { probeRow_ += probeRowCount_; @@ -357,9 +379,6 @@ bool NestedLoopJoinProbe::addToOutput() { // Check if the current probed row needs to be added as a mismatch (for left // and full outer joins). checkProbeMismatchRow(); - if (output_ != nullptr) { - output_->resize(numOutputRows_); - } // Signals that all input has been generated for the probeRow and build // vectors; safe to move to the next probe record. diff --git a/velox/exec/NestedLoopJoinProbe.h b/velox/exec/NestedLoopJoinProbe.h index b59457b118a..b4752052dea 100644 --- a/velox/exec/NestedLoopJoinProbe.h +++ b/velox/exec/NestedLoopJoinProbe.h @@ -113,12 +113,19 @@ class NestedLoopJoinProbe : public Operator { // smaller in some cases - outputs follow the probe side buffer boundaries. RowVectorPtr generateOutput(); + // For non cross-join mode, the `output_` can be reused across multible probe + // rows. If the input_ has remaining rows and the output_ is not fully filled, + // it returns false here. + bool readyToProduceOutput(); + // Fill in joined output to `output_` by matching the current probeRow_ and // successive build vectors (using getNextCrossProductBatch()). Stops when // either all build vectors were matched for the current probeRow (returns // true), or if the output is full (returns false). If it returns false, a // valid vector with more than zero records will be available at `output_`; if // it returns true, either nullptr or zero records may be placed at `output_`. + // Also if it returns true, it's the caller's responsiblity to deicide when to + // set `output_` size. // // Also updates `buildMatched_` if the build records that received a match, so // that they can be used to implement right and full outer join semantic once