diff --git a/velox/core/PlanNode.h b/velox/core/PlanNode.h index 15b78f19033..b0aea5aa933 100644 --- a/velox/core/PlanNode.h +++ b/velox/core/PlanNode.h @@ -1669,7 +1669,8 @@ class MergeJoinNode : public AbstractJoinNode { /// /// Nested loop join supports both equal and non-equal joins. Expressions /// specified in joinCondition are evaluated on every combination of left/right -/// tuple, to emit result. +/// tuple, to emit result. Results are emitted following the same input order of +/// probe rows for inner and left joins, for each thread of execution. /// /// To create Cartesian product of the left/right's output, use the constructor /// without `joinType` and `joinCondition` parameter. diff --git a/velox/docs/develop/operators.rst b/velox/docs/develop/operators.rst index 7ea22ea2fcf..b7cac0c915e 100644 --- a/velox/docs/develop/operators.rst +++ b/velox/docs/develop/operators.rst @@ -562,7 +562,9 @@ NestedLoopJoinNode NestedLoopJoinNode represents an implementation that iterates through each row from the left side of the join and, for each row, iterates through all rows from the right side of the join, comparing them based on the join condition to find matching rows -and emitting results. Nested loop join supports non-equality join. +and emitting results. Nested loop join supports non-equality joins, and emit output +rows in the same order as the probe input (for inner and left outer joins) for each +thread of execution. .. list-table:: :widths: 10 30 diff --git a/velox/exec/NestedLoopJoinProbe.cpp b/velox/exec/NestedLoopJoinProbe.cpp index 74253b427ee..cf2656b4924 100644 --- a/velox/exec/NestedLoopJoinProbe.cpp +++ b/velox/exec/NestedLoopJoinProbe.cpp @@ -19,8 +19,8 @@ #include "velox/expression/FieldReference.h" namespace facebook::velox::exec { - namespace { + bool needsProbeMismatch(core::JoinType joinType) { return isLeftJoin(joinType) || isFullJoin(joinType); } @@ -42,6 +42,7 @@ std::vector extractProjections( } return projections; } + } // namespace NestedLoopJoinProbe::NestedLoopJoinProbe( @@ -77,6 +78,50 @@ void NestedLoopJoinProbe::initialize() { joinNode_.reset(); } +void NestedLoopJoinProbe::initializeFilter( + const core::TypedExprPtr& filter, + const RowTypePtr& probeType, + const RowTypePtr& buildType) { + VELOX_CHECK_NULL(joinCondition_); + + std::vector filters = {filter}; + joinCondition_ = + std::make_unique(std::move(filters), operatorCtx_->execCtx()); + + column_index_t filterChannel = 0; + std::vector names; + std::vector types; + const auto numFields = joinCondition_->expr(0)->distinctFields().size(); + names.reserve(numFields); + types.reserve(numFields); + + for (const auto& field : joinCondition_->expr(0)->distinctFields()) { + const auto& name = field->field(); + auto channel = probeType->getChildIdxIfExists(name); + if (channel.has_value()) { + auto channelValue = channel.value(); + filterProbeProjections_.emplace_back(channelValue, filterChannel++); + names.emplace_back(probeType->nameOf(channelValue)); + types.emplace_back(probeType->childAt(channelValue)); + continue; + } + channel = buildType->getChildIdxIfExists(name); + if (channel.has_value()) { + auto channelValue = channel.value(); + filterBuildProjections_.emplace_back(channelValue, filterChannel++); + names.emplace_back(buildType->nameOf(channelValue)); + types.emplace_back(buildType->childAt(channelValue)); + continue; + } + VELOX_FAIL( + "Join filter field {} not in probe or build input, filter: {}", + field->toString(), + filter->toString()); + } + + filterInputType_ = ROW(std::move(names), std::move(types)); +} + BlockingReason NestedLoopJoinProbe::isBlocked(ContinueFuture* future) { switch (state_) { case ProbeOperatorState::kRunning: @@ -97,6 +142,9 @@ BlockingReason NestedLoopJoinProbe::isBlocked(ContinueFuture* future) { } VELOX_CHECK(buildVectors_.has_value()); + // If we just got build data, check if this is a right or full join where + // we need to hit track of hits on build records. If it is, initialize the + // selectivity vectors that do so. if (needsBuildMismatch(joinType_)) { buildMatched_.resize(buildVectors_->size()); for (auto i = 0; i < buildVectors_->size(); ++i) { @@ -132,9 +180,37 @@ void NestedLoopJoinProbe::addInput(RowVectorPtr input) { probeSideEmpty_ = false; } VELOX_CHECK_EQ(buildIndex_, 0); - if (needsProbeMismatch(joinType_)) { - probeMatched_.resizeFill(input_->size(), false); +} + +void NestedLoopJoinProbe::noMoreInput() { + Operator::noMoreInput(); + if (state_ != ProbeOperatorState::kRunning || input_ != nullptr) { + return; + } + if (!needsBuildMismatch(joinType_)) { + setState(ProbeOperatorState::kFinish); + return; + } + beginBuildMismatch(); +} + +bool NestedLoopJoinProbe::getBuildData(ContinueFuture* future) { + VELOX_CHECK(!buildVectors_.has_value()); + + auto buildData = + operatorCtx_->task() + ->getNestedLoopJoinBridge( + operatorCtx_->driverCtx()->splitGroupId, planNodeId()) + ->dataOrFuture(future); + if (!buildData.has_value()) { + return false; + } + + buildVectors_ = std::move(buildData); + if (buildVectors_->empty()) { + buildSideEmpty_ = true; } + return true; } RowVectorPtr NestedLoopJoinProbe::getOutput() { @@ -144,11 +220,15 @@ RowVectorPtr NestedLoopJoinProbe::getOutput() { } RowVectorPtr output{nullptr}; while (output == nullptr) { + // If we are done processing all build and probe data, and this is the + // operator producing build mismatches (only for right a full outer join). if (lastProbe_) { VELOX_CHECK(processingBuildMismatch()); + // Scans build input producing build mismatches by wrapping dictionaries + // to build input, and null constant to probe projections. while (output == nullptr && !hasProbedAllBuildData()) { - output = getMismatchedOutput( + output = getBuildMismatchedOutput( buildVectors_.value()[buildIndex_], buildMatched_[buildIndex_], buildOutMapping_, @@ -162,136 +242,260 @@ RowVectorPtr NestedLoopJoinProbe::getOutput() { break; } + // Need more input. if (input_ == nullptr) { break; } - // When input_ is not null but buildIndex_ is at the end, it means the - // matching of input_ and buildData_ has finished. For left/full joins, - // the next step is to emit output for mismatched probe side rows. - if (hasProbedAllBuildData()) { - output = needsProbeMismatch(joinType_) ? getMismatchedOutput( - input_, - probeMatched_, - probeOutMapping_, - identityProjections_, - buildProjections_) - : nullptr; - finishProbeInput(); - break; - } + // Generate actual join output by processing probe and build matches, and + // probe mismaches (for left joins). + output = generateOutput(); + } + return output; +} - const vector_size_t probeCnt = getNumProbeRows(); - output = doMatch(probeCnt); - if (advanceProbeRows(probeCnt)) { - if (!needsProbeMismatch(joinType_)) { - finishProbeInput(); - } +RowVectorPtr NestedLoopJoinProbe::generateOutput() { + // If addToOutput() returns false, output_ is filled. Need to produce it. + if (!addToOutput()) { + VELOX_CHECK_GT(output_->size(), 0); + return std::move(output_); + } + + if (advanceProbeRow()) { + finishProbeInput(); + } + + if (output_ != nullptr && output_->size() == 0) { + output_ = nullptr; + } + return std::move(output_); +} + +bool NestedLoopJoinProbe::advanceProbeRow() { + if (hasProbedAllBuildData()) { + ++probeRow_; + probeRowHasMatch_ = false; + buildIndex_ = 0; + + // If we finished processing the probe side. + if (probeRow_ >= input_->size()) { + return true; } } - return output; + return false; } -void NestedLoopJoinProbe::initializeFilter( - const core::TypedExprPtr& filter, - const RowTypePtr& probeType, - const RowTypePtr& buildType) { - VELOX_CHECK_NULL(joinCondition_); +// Main join loop. +bool NestedLoopJoinProbe::addToOutput() { + VELOX_CHECK_NOT_NULL(input_); - std::vector filters = {filter}; - joinCondition_ = - std::make_unique(std::move(filters), operatorCtx_->execCtx()); + // First, create a new output vector. By default, allocate space for + // outputBatchSize_ rows. The output always generates dictionaries wrapped + // around the probe vector being processed. + prepareOutput(); - column_index_t filterChannel = 0; - std::vector names; - std::vector types; - auto numFields = joinCondition_->expr(0)->distinctFields().size(); - names.reserve(numFields); - types.reserve(numFields); - for (auto& field : joinCondition_->expr(0)->distinctFields()) { - const auto& name = field->field(); - auto channel = probeType->getChildIdxIfExists(name); - if (channel.has_value()) { - auto channelValue = channel.value(); - filterProbeProjections_.emplace_back(channelValue, filterChannel++); - names.emplace_back(probeType->nameOf(channelValue)); - types.emplace_back(probeType->childAt(channelValue)); + while (!hasProbedAllBuildData()) { + const auto& currentBuild = buildVectors_.value()[buildIndex_]; + + // Empty build vector; move to the next. + if (currentBuild->size() == 0) { + ++buildIndex_; + buildRow_ = 0; continue; } - channel = buildType->getChildIdxIfExists(name); - if (channel.has_value()) { - auto channelValue = channel.value(); - filterBuildProjections_.emplace_back(channelValue, filterChannel++); - names.emplace_back(buildType->nameOf(channelValue)); - types.emplace_back(buildType->childAt(channelValue)); - continue; + + // If this is a cross join, there is no filter to evaluate. We can just + // return the output vector directly, which is composed of the build + // projections at `probeRow_` (as constants), and current vector of the + // build side. Also don't need to bother about adding mismatched rows. + if (joinCondition_ == nullptr) { + output_ = getNextCrossProductBatch( + currentBuild, outputType_, identityProjections_, buildProjections_); + numOutputRows_ = output_->size(); + probeRowHasMatch_ = true; + ++buildIndex_; + buildRow_ = 0; + return false; } - VELOX_FAIL( - "Join filter field {} not in probe or build input, filter: {}", - field->toString(), - filter->toString()); + + // Only re-calculate the filter if we have a new build vector. + if (buildRow_ == 0) { + evaluateJoinFilter(currentBuild); + } + + // Iterate over the filter results. For each match, add an output record. + for (size_t i = buildRow_; i < decodedFilterResult_.size(); ++i) { + if (isJoinConditionMatch(i)) { + addOutputRow(i); + ++numOutputRows_; + probeRowHasMatch_ = true; + + // If this is a right or full join, we need to keep track of the build + // records that got a hit (key match), so that at end we know which + // build records to add and which to skip. + if (needsBuildMismatch(joinType_)) { + buildMatched_[buildIndex_].setValid(i, true); + } + + // If the buffer is full, save state and produce it as output. + if (numOutputRows_ == outputBatchSize_) { + buildRow_ = i + 1; + copyBuildValues(currentBuild); + return false; + } + } + } + + // Before moving to the next build vector, copy the needed ranges. + copyBuildValues(currentBuild); + ++buildIndex_; + buildRow_ = 0; } - filterInputType_ = ROW(std::move(names), std::move(types)); + // Check if the current probed row needs to be added as a mismatch (for left + // and full outer joins). + checkProbeMismatchRow(); + output_->resize(numOutputRows_); + + // Signals that all input has been generated for the probeRow and build + // vectors; safe to move to the next probe record. + return true; } -RowVectorPtr NestedLoopJoinProbe::getMismatchedOutput( - const RowVectorPtr& data, - const SelectivityVector& matched, - BufferPtr& unmatchedMapping, - const std::vector& projections, - const std::vector& nullProjections) { - // If data is all matched or the join is a cross product, there is no - // mismatched rows. But there is an exception that if the join is a cross - // product but the build or probe side is empty, there could still be - // mismatched rows from the other side. - if (matched.isAllSelected() || - (joinCondition_ == nullptr && !probeSideEmpty_ && !buildSideEmpty_)) { - return nullptr; +void NestedLoopJoinProbe::prepareOutput() { + if (output_ != nullptr) { + return; } + std::vector localColumns(outputType_->size()); - auto rawMapping = - initializeRowNumberMapping(unmatchedMapping, data->size(), pool()); - int32_t numUnmatched{0}; - for (auto i = 0; i < data->size(); ++i) { - if (!matched.isValid(i)) { - rawMapping[numUnmatched++] = i; - } + probeIndices_ = allocateIndices(outputBatchSize_, pool()); + rawProbeIndices_ = probeIndices_->asMutable(); + + for (const auto& projection : identityProjections_) { + localColumns[projection.outputChannel] = BaseVector::wrapInDictionary( + {}, + probeIndices_, + outputBatchSize_, + input_->childAt(projection.inputChannel)); } - VELOX_CHECK_GT(numUnmatched, 0); - std::vector projectedChildren(outputType_->size()); + for (const auto& projection : buildProjections_) { + localColumns[projection.outputChannel] = BaseVector::create( + outputType_->childAt(projection.outputChannel), + outputBatchSize_, + operatorCtx_->pool()); + } + + numOutputRows_ = 0; + output_ = std::make_shared( + pool(), outputType_, nullptr, outputBatchSize_, std::move(localColumns)); +} + +void NestedLoopJoinProbe::evaluateJoinFilter(const RowVectorPtr& buildVector) { + // First step to process is to get a batch so we can evaluate the join + // filter. + auto filterInput = getNextCrossProductBatch( + buildVector, + filterInputType_, + filterProbeProjections_, + filterBuildProjections_); + + if (filterInputRows_.size() != filterInput->size()) { + filterInputRows_.resizeFill(filterInput->size(), true); + } + VELOX_CHECK(filterInputRows_.isAllSelected()); + + std::vector filterResult; + EvalCtx evalCtx( + operatorCtx_->execCtx(), joinCondition_.get(), filterInput.get()); + joinCondition_->eval(0, 1, true, filterInputRows_, evalCtx, filterResult); + filterOutput_ = filterResult[0]; + decodedFilterResult_.decode(*filterOutput_, filterInputRows_); +} + +RowVectorPtr NestedLoopJoinProbe::getNextCrossProductBatch( + const RowVectorPtr& buildVector, + const RowTypePtr& outputType, + const std::vector& probeProjections, + const std::vector& buildProjections) { + std::vector projectedChildren(outputType->size()); + const auto numOutputRows = buildVector->size(); + + // Project columns from the build side. projectChildren( - projectedChildren, data, projections, numUnmatched, unmatchedMapping); - for (auto [_, outputChannel] : nullProjections) { - VELOX_CHECK_GT(projectedChildren.size(), outputChannel); - projectedChildren[outputChannel] = BaseVector::createNullConstant( - outputType_->childAt(outputChannel), numUnmatched, pool()); + projectedChildren, buildVector, buildProjections, numOutputRows, nullptr); + + // Wrap projections from the probe side as constants. + for (auto [inputChannel, outputChannel] : probeProjections) { + projectedChildren[outputChannel] = BaseVector::wrapInConstant( + numOutputRows, probeRow_, input_->childAt(inputChannel)); } + return std::make_shared( - pool(), outputType_, nullptr, numUnmatched, std::move(projectedChildren)); + pool(), outputType, nullptr, numOutputRows, std::move(projectedChildren)); +} + +void NestedLoopJoinProbe::addOutputRow(vector_size_t buildRow) { + // Probe side is always a dictionary; just populate the index. + rawProbeIndices_[numOutputRows_] = probeRow_; + + // For the build side, we accumulate the ranges to copy, then copy all of them + // at once. If records are consecutive and can have a single copy range run. + if (!buildCopyRanges_.empty() && + (buildCopyRanges_.back().sourceIndex + buildCopyRanges_.back().count) == + buildRow) { + ++buildCopyRanges_.back().count; + } else { + buildCopyRanges_.push_back({buildRow, numOutputRows_, 1}); + } +} + +void NestedLoopJoinProbe::copyBuildValues(const RowVectorPtr& buildVector) { + if (!buildCopyRanges_.empty()) { + for (const auto& projection : buildProjections_) { + const auto& buildChild = buildVector->childAt(projection.inputChannel); + const auto& outputChild = output_->childAt(projection.outputChannel); + outputChild->copyRanges(buildChild.get(), buildCopyRanges_); + } + buildCopyRanges_.clear(); + } +} + +void NestedLoopJoinProbe::addProbeMismatchRow() { + // Probe side is always a dictionary; just populate the index. + rawProbeIndices_[numOutputRows_] = probeRow_; + + // Null out build projections. + for (const auto& projection : buildProjections_) { + const auto& outputChild = output_->childAt(projection.outputChannel); + outputChild->setNull(numOutputRows_, true); + } +} + +void NestedLoopJoinProbe::checkProbeMismatchRow() { + // If we are processing the last batch of the build side, check if we need + // to add a probe mismatch record. + if (needsProbeMismatch(joinType_) && hasProbedAllBuildData() && + !probeRowHasMatch_) { + addProbeMismatchRow(); + ++numOutputRows_; + } } void NestedLoopJoinProbe::finishProbeInput() { VELOX_CHECK_NOT_NULL(input_); input_.reset(); buildIndex_ = 0; + probeRow_ = 0; + if (!noMoreInput_) { return; } - if (!needsBuildMismatch(joinType_) || buildSideEmpty_) { - setState(ProbeOperatorState::kFinish); - return; - } - beginBuildMismatch(); -} -void NestedLoopJoinProbe::noMoreInput() { - Operator::noMoreInput(); - if (state_ != ProbeOperatorState::kRunning || input_ != nullptr) { - return; - } - if (!needsBuildMismatch(joinType_)) { + // From now one we finished processing the probe side. Check now if this is a + // right or full outer join, and hence we may need to start emitting buid + // mismatch records. + if (!needsBuildMismatch(joinType_) || buildSideEmpty_) { setState(ProbeOperatorState::kFinish); return; } @@ -301,6 +505,8 @@ void NestedLoopJoinProbe::noMoreInput() { void NestedLoopJoinProbe::beginBuildMismatch() { VELOX_CHECK(needsBuildMismatch(joinType_)); + // Check the state of peer operators. Only the last driver (operator) running + // this code will survive and move on to process build mismatches. std::vector promises; std::vector> peers; if (!operatorCtx_->task()->allPeersFinished( @@ -311,8 +517,11 @@ void NestedLoopJoinProbe::beginBuildMismatch() { } lastProbe_ = true; + // From now on, buildIndex_ is used to indexing into buildMismatched_ VELOX_CHECK_EQ(buildIndex_, 0); + + // Colect and merge the build mismatch selectivity vectors from all peers. for (auto& peer : peers) { auto* op = peer->findOperator(planNodeId()); auto* probe = dynamic_cast(op); @@ -331,185 +540,41 @@ void NestedLoopJoinProbe::beginBuildMismatch() { } } -bool NestedLoopJoinProbe::getBuildData(ContinueFuture* future) { - VELOX_CHECK(!buildVectors_.has_value()); - - auto buildData = - operatorCtx_->task() - ->getNestedLoopJoinBridge( - operatorCtx_->driverCtx()->splitGroupId, planNodeId()) - ->dataOrFuture(future); - if (!buildData.has_value()) { - return false; - } - - buildVectors_ = std::move(buildData); - if (buildVectors_->empty()) { - buildSideEmpty_ = true; - } - return true; -} - -vector_size_t NestedLoopJoinProbe::getNumProbeRows() const { - VELOX_CHECK_NOT_NULL(input_); - VELOX_CHECK(!hasProbedAllBuildData()); - - const auto inputSize = input_->size(); - auto numBuildRows = buildVectors_.value()[buildIndex_]->size(); - vector_size_t numProbeRows; - if (numBuildRows > outputBatchSize_) { - numProbeRows = 1; - } else { - numProbeRows = std::min( - (vector_size_t)outputBatchSize_ / numBuildRows, inputSize - probeRow_); - } - return numProbeRows; -} - -RowVectorPtr NestedLoopJoinProbe::getCrossProduct( - vector_size_t probeCnt, - const RowTypePtr& outputType, - const std::vector& probeProjections, - const std::vector& buildProjections) { - VELOX_CHECK_GT(probeCnt, 0); - VELOX_CHECK(!hasProbedAllBuildData()); - - const auto buildSize = buildVectors_.value()[buildIndex_]->size(); - const auto numOutputRows = probeCnt * buildSize; - const bool probeCntChanged = (probeCnt != numPrevProbedRows_); - numPrevProbedRows_ = probeCnt; - - auto rawProbeIndices = - initializeRowNumberMapping(probeIndices_, numOutputRows, pool()); - for (auto i = 0; i < probeCnt; ++i) { - std::fill( - rawProbeIndices.begin() + i * buildSize, - rawProbeIndices.begin() + (i + 1) * buildSize, - probeRow_ + i); - } - - if (probeCntChanged) { - auto rawBuildIndices_ = - initializeRowNumberMapping(buildIndices_, numOutputRows, pool()); - for (auto i = 0; i < probeCnt; ++i) { - std::iota( - rawBuildIndices_.begin() + i * buildSize, - rawBuildIndices_.begin() + (i + 1) * buildSize, - 0); - } - } - - std::vector projectedChildren(outputType->size()); - projectChildren( - projectedChildren, - input_, - probeProjections, - numOutputRows, - probeIndices_); - projectChildren( - projectedChildren, - buildVectors_.value()[buildIndex_], - buildProjections, - numOutputRows, - buildIndices_); - - return std::make_shared( - pool(), outputType, nullptr, numOutputRows, std::move(projectedChildren)); -} - -bool NestedLoopJoinProbe::advanceProbeRows(vector_size_t probeCnt) { - probeRow_ += probeCnt; - if (probeRow_ < input_->size()) { - return false; - } - probeRow_ = 0; - numPrevProbedRows_ = 0; - do { - ++buildIndex_; - } while (!hasProbedAllBuildData() && - !buildVectors_.value()[buildIndex_]->size()); - return hasProbedAllBuildData(); -} - -RowVectorPtr NestedLoopJoinProbe::doMatch(vector_size_t probeCnt) { - VELOX_CHECK_NOT_NULL(input_); - VELOX_CHECK(!hasProbedAllBuildData()); - - if (joinCondition_ == nullptr) { - return getCrossProduct( - probeCnt, outputType_, identityProjections_, buildProjections_); - } - - auto filterInput = getCrossProduct( - probeCnt, - filterInputType_, - filterProbeProjections_, - filterBuildProjections_); - - if (filterInputRows_.size() != filterInput->size()) { - filterInputRows_.resizeFill(filterInput->size(), true); +RowVectorPtr NestedLoopJoinProbe::getBuildMismatchedOutput( + const RowVectorPtr& data, + const SelectivityVector& matched, + BufferPtr& unmatchedMapping, + const std::vector& projections, + const std::vector& nullProjections) { + // If data is all matched or the join is a cross product, there is no + // mismatched rows. But there is an exception that if the join is a cross + // product but the build or probe side is empty, there could still be + // mismatched rows from the other side. + if (matched.isAllSelected() || + (joinCondition_ == nullptr && !probeSideEmpty_ && !buildSideEmpty_)) { + return nullptr; } - VELOX_CHECK(filterInputRows_.isAllSelected()); - std::vector filterResult; - EvalCtx evalCtx( - operatorCtx_->execCtx(), joinCondition_.get(), filterInput.get()); - joinCondition_->eval(0, 1, true, filterInputRows_, evalCtx, filterResult); - DecodedVector decodedFilterResult; - decodedFilterResult.decode(*filterResult[0], filterInputRows_); - - const vector_size_t maxOutputRows = decodedFilterResult.size(); - auto rawProbeOutMapping = - initializeRowNumberMapping(probeOutMapping_, maxOutputRows, pool()); - auto rawBuildOutMapping = - initializeRowNumberMapping(buildOutMapping_, maxOutputRows, pool()); - auto* probeIndices = probeIndices_->asMutable(); - auto* buildIndices = buildIndices_->asMutable(); - int32_t numOutputRows{0}; - for (auto i = 0; i < maxOutputRows; ++i) { - if (!decodedFilterResult.isNullAt(i) && - decodedFilterResult.valueAt(i)) { - rawProbeOutMapping[numOutputRows] = probeIndices[i]; - rawBuildOutMapping[numOutputRows] = buildIndices[i]; - ++numOutputRows; - } - } - if (needsProbeMismatch(joinType_)) { - for (auto i = 0; i < numOutputRows; ++i) { - probeMatched_.setValid(rawProbeOutMapping[i], true); - } - probeMatched_.updateBounds(); - } - if (needsBuildMismatch(joinType_)) { - for (auto i = 0; i < numOutputRows; ++i) { - buildMatched_[buildIndex_].setValid(rawBuildOutMapping[i], true); + auto rawMapping = + initializeRowNumberMapping(unmatchedMapping, data->size(), pool()); + int32_t numUnmatched{0}; + for (auto i = 0; i < data->size(); ++i) { + if (!matched.isValid(i)) { + rawMapping[numUnmatched++] = i; } } - - if (numOutputRows == 0) { - return nullptr; - } + VELOX_CHECK_GT(numUnmatched, 0); std::vector projectedChildren(outputType_->size()); projectChildren( - projectedChildren, - input_, - identityProjections_, - numOutputRows, - probeOutMapping_); - projectChildren( - projectedChildren, - buildVectors_.value()[buildIndex_], - buildProjections_, - numOutputRows, - buildOutMapping_); - + projectedChildren, data, projections, numUnmatched, unmatchedMapping); + for (auto [_, outputChannel] : nullProjections) { + VELOX_CHECK_GT(projectedChildren.size(), outputChannel); + projectedChildren[outputChannel] = BaseVector::createNullConstant( + outputType_->childAt(outputChannel), numUnmatched, pool()); + } return std::make_shared( - pool(), - outputType_, - nullptr, - numOutputRows, - std::move(projectedChildren)); + pool(), outputType_, nullptr, numUnmatched, std::move(projectedChildren)); } } // namespace facebook::velox::exec diff --git a/velox/exec/NestedLoopJoinProbe.h b/velox/exec/NestedLoopJoinProbe.h index 4d5d4db6193..0f75f910b30 100644 --- a/velox/exec/NestedLoopJoinProbe.h +++ b/velox/exec/NestedLoopJoinProbe.h @@ -20,6 +20,39 @@ #include "velox/exec/ProbeOperatorState.h" namespace facebook::velox::exec { + +/// Implements a Nested Loop Join (NLJ) between records from the probe (input_) +/// and build (NestedLoopJoinBridge) sides. It supports inner, left, right and +/// full outer joins. +/// +/// This class is generally useful to evaluate non-equi-joins (e.g. "k1 >= k2"), +/// when join conditions may need to be evaluated against a full cross product +/// of the input. +/// +/// The output follows the order of the probe side rows (for inner and left +/// joins). All build vectors are materialized upfront (check buildVectors_), +/// but probe batches are processed one-by-one as a stream. +/// +/// To produce output, the operator processes each probe record from probe +/// input, using the following steps: +/// +/// 1. Materialize a cross product by wrapping each probe record (as a constant) +/// to each build vector. +/// 2. Evaluate the join condition. +/// 3. Add key matches to the output. +/// 4. Once all build vectors are processed for a particular probe row, check if +/// a probe mismatch is needed (only for left and full outer joins). +/// 5. Once all probe and build inputs are processed, check if build mismatches +/// are needed (only for right and full outer joins). +/// 6. If so, signal other peer operators; only a single operator instance will +/// collect all build matches at the end, and emit any records that haven't +/// been matched by any of the peers. +/// +/// The output always contains dictionaries wrapped around probe columns, and +/// copies for build columns. The buid-side copies are done lazily; it first +/// accumulates the ranges to be copied, then performs the copies in batch, +/// column-by-column. It produces at most `outputBatchSize_` records, but it may +/// produce fewer since the output needs to follow the probe vector boundaries. class NestedLoopJoinProbe : public Operator { public: NestedLoopJoinProbe( @@ -56,38 +89,109 @@ class NestedLoopJoinProbe : public Operator { const RowTypePtr& leftType, const RowTypePtr& rightType); + // Materializes build data from nested loop join bridge into `buildVectors_`. + // Returns whether the data has been materialized and is ready for use. Nested + // loop join requires all build data to be materialized and available in + // `buildVectors_` before it can produce output. bool getBuildData(ContinueFuture* future); - // Calculates the number of probe rows to match with the build side vectors - // given the output batch size limit. - vector_size_t getNumProbeRows() const; + // Generates output from join matches between probe and build sides, as well + // as probe mismatches (for left and full outer joins). As much as possible, + // generates outputs `outputBatchSize_` records at a time, but batches may be + // smaller in some cases - outputs follow the probe side buffer boundaries. + RowVectorPtr generateOutput(); + + // Fill in joined output to `output_` by matching the current probeRow_ and + // successive build vectors (using getNextCrossProductBatch()). Stops when + // either all build vectors were matched for the current probeRow (returns + // true), or if the output is full (returns false). If it returns false, a + // valid vector with more than zero records will be available at `output_`; if + // it returns true, either nullptr or zero records may be placed at `output_`. + // + // Also updates `buildMatched_` if the build records that received a match, so + // that they can be used to implement right and full outer join semantic once + // all probe data has been processed. + bool addToOutput(); + + // Advances 'probeRow_' and resets required state information. Returns true + // if there is not more probe data to be processed in the current `input_` + // (and hence a new probe input is required). False otherwise. + bool advanceProbeRow(); + + // Ensures a new batch of records is available at `output_` and ready to + // receive rows. Batches have space for `outputBatchSize_`. + void prepareOutput(); - // Generates cross product of next 'probeCnt' rows of input_, and all rows of - // build side vector at 'buildIndex_' in 'buildData_'. - // 'outputType' specifies the type of output. - // Projections from input_ and buildData_ to the output are specified by - // 'probeProjections' and 'buildProjections' respectively. Caller is - // responsible for ensuring all columns in outputType is contained in either + // Evaluates the joinCondition for a given build vector. This method sets + // `filterOutput_` and `decodedFilterResult_`, which will be ready to be used + // by `isJoinConditionMatch(buildRow)` below. + void evaluateJoinFilter(const RowVectorPtr& buildVector); + + // Checks if the join condition matched for a particular row. + bool isJoinConditionMatch(vector_size_t i) const { + return ( + !decodedFilterResult_.isNullAt(i) && + decodedFilterResult_.valueAt(i)); + } + + // Generates the next batch of a cross product between probe and build. It + // uses the current probe record being processed (`probeRow_` from `intput_`) + // for probe projections, and the columns from buildVector for build // projections. - // TODO: consider consolidate the routine of producing cartesian product that - // can be reused at MergeJoin::addToOutput - RowVectorPtr getCrossProduct( - vector_size_t probeCnt, + // + // Output projections can be specified so that this function can be used to + // generate both filter input and actual output (in case there is no join + // filter - cross join). + RowVectorPtr getNextCrossProductBatch( + const RowVectorPtr& buildVector, const RowTypePtr& outputType, const std::vector& probeProjections, const std::vector& buildProjections); - // Evaluates joinCondition against the output of getCrossProduct(probeCnt), - // returns the result that passed joinCondition, updates probeMatched_, - // buildMatched_ accordingly. - RowVectorPtr doMatch(vector_size_t probeCnt); + // Add a single record to `output_` based on buildRow from buildVector, and + // the current probeRow and probe vector (input_). Probe side projections are + // zero-copy (dictionary indices), and build side projections are marked to be + // copied using `buildCopyRanges_`; they will be copied later on by + // `copyBuildValues()`. + void addOutputRow(vector_size_t buildRow); + + // Checks if it is required to add a probe mismatch row, and does it if + // needed. The caller needs to ensure there is available space in `output_` + // for the new record, which has nulled out build projections. + void checkProbeMismatchRow(); - // Updates 'probeRow_' and 'buildIndex_' by advancing 'probeRow_' by probeCnt. - // Returns true if 'buildIndex_' points to the end of 'buildData_'. - bool advanceProbeRows(vector_size_t probeCnt); + // Add a probe mismatch (only for left/full outer joins). The record is based + // on the current probeRow and vector (input_) and build projections are null. + void addProbeMismatchRow(); + // Copies the ranges from buildVector specified by `buildCopyRanges_` to + // `output_`, one projected column at a time. Clears buildCopyRanges_. + void copyBuildValues(const RowVectorPtr& buildVector); + + // Called when we are done processing the current probe batch, to signal we + // are ready for the next one. + // + // If this is the last probe batch (and this is a right or full outer join), + // change the operator state to signal peers. + void finishProbeInput(); + + // When doing right/full joins, all but the last probe operator that finished + // matching probe-side input will turn into kFinish state. + // The last finishing operator will gather buildMatched from all the other + // probe operators to emit output for mismatched build side rows. + void beginBuildMismatch(); + + // If this is the operator producing build mismatches (only after producing + // all matches and probe mismatches). + bool processingBuildMismatch() const { + return state_ == ProbeOperatorState::kRunning && input_ == nullptr && + noMoreInput_; + } + + // Whether we have processed all build data for the current probe row (based + // on buildIndex_'s value). bool hasProbedAllBuildData() const { - return (buildIndex_ == buildVectors_.value().size()); + return (buildIndex_ >= buildVectors_.value().size()); } // Wraps rows of 'data' that are not selected in 'matched' and projects @@ -95,74 +199,105 @@ class NestedLoopJoinProbe : public Operator { // create null column vectors in output for outer join. 'unmatchedMapping' is // the reusable buffer to record the mismatched row numbers for output // projections. - RowVectorPtr getMismatchedOutput( + RowVectorPtr getBuildMismatchedOutput( const RowVectorPtr& data, const SelectivityVector& matched, BufferPtr& unmatchedMapping, const std::vector& projections, const std::vector& nullProjections); - void finishProbeInput(); - - // When doing right/full joins, all but the last probe operators that finished - // matching and probe-side mismatch output, will turn into kFinish state. - // The last finishing operator will gather buildMatched from all the other - // probe operators to emit output for mismatched build side rows. - void beginBuildMismatch(); - - bool processingBuildMismatch() const { - return state_ == ProbeOperatorState::kRunning && input_ == nullptr && - noMoreInput_; - } - // TODO: Add state transition check. void setState(ProbeOperatorState state) { state_ = state; } private: + // Output buffer members. + // Maximum number of rows in the output batch. const uint32_t outputBatchSize_; + + // The current output batch being populated. + RowVectorPtr output_; + + // Number of output rows in the current output batch. + vector_size_t numOutputRows_{0}; + + // Dictionary indices for probe columns. + BufferPtr probeIndices_; + vector_size_t* rawProbeIndices_; + + // Join condition expression. + + // May be nullptr for a cross join. + std::unique_ptr joinCondition_; + + // Input type for the join condition expression. + RowTypePtr filterInputType_; + + // Join condition evaluation state that need to persisted across the + // generation of successive output buffers. + SelectivityVector filterInputRows_; + VectorPtr filterOutput_; + DecodedVector decodedFilterResult_; + + // Join metadata and state. std::shared_ptr joinNode_; const core::JoinType joinType_; ProbeOperatorState state_{ProbeOperatorState::kWaitForBuild}; ContinueFuture future_{ContinueFuture::makeEmpty()}; - // Join condition-related state - std::unique_ptr joinCondition_; - RowTypePtr filterInputType_; - SelectivityVector filterInputRows_; + // Probe side state. - // Probe side state - // Input row to process on next call to getOutput(). + // Probe row being currently processed (related to `input_`). vector_size_t probeRow_{0}; - // Records the number of probed rows in last getCrossProduct() call. It is - // reset upon each 'buildIndex_' update. - vector_size_t numPrevProbedRows_{0}; + + // Whether the current probeRow_ has produces a match. Used for left and full + // outer joins. + bool probeRowHasMatch_{false}; + + // Controls if this is the operator gathering and producing right/full outer + // join mismatches. This is only set after all probe and build data has been + // processed, only for right/full outer joins, and only executed in one single + // operator (need to wait until all peers are finished). bool lastProbe_{false}; - // Represents whether probe side rows have been matched. - SelectivityVector probeMatched_; - std::vector filterProbeProjections_; - BufferPtr probeOutMapping_; - BufferPtr probeIndices_; - // Indicate if the probe side has empty input or not. For the last prober, + + // Indicate if the probe side has empty input or not. For the last probe, // this indicates if all the probe sides are empty or not. This flag is used // for mismatched output producing. bool probeSideEmpty_{true}; - // Build side state + // Build side state. + + // Stores the data for build vectors (right side of the join). std::optional> buildVectors_; bool buildSideEmpty_{false}; - // Index into buildData_ for the build side vector to process on next call to - // getOutput(). + + // Index into `buildVectors_` for the build vector being currently processed. size_t buildIndex_{0}; - std::vector buildProjections_; - BufferPtr buildIndices_; - // Represents whether probe build rows have been matched. + // Row being currently processed from `buildVectors_[buildIndex_]`. + vector_size_t buildRow_{0}; + + // Keep track of the build rows that had matches (only used for right or full + // outer joins). std::vector buildMatched_; + + // Stores the ranges of build values to be copied to the output vector (we + // batch them and copy once, instead of copying them row-by-row). + std::vector buildCopyRanges_; + + // List of output projections from the build side. Note that the list of + // projections from the probe side is available at `identityProjections_`. + std::vector buildProjections_; + + // Projections needed as input to the filter to evaluation join filter + // conditions. Note that if this is a cross-join, filter projections are the + // same as output projections. + std::vector filterProbeProjections_; std::vector filterBuildProjections_; + BufferPtr buildOutMapping_; }; diff --git a/velox/exec/tests/NestedLoopJoinTest.cpp b/velox/exec/tests/NestedLoopJoinTest.cpp index 4a4b2211ca0..98c6227ed4c 100644 --- a/velox/exec/tests/NestedLoopJoinTest.cpp +++ b/velox/exec/tests/NestedLoopJoinTest.cpp @@ -19,9 +19,10 @@ #include "velox/exec/tests/utils/VectorTestUtil.h" #include "velox/vector/fuzzer/VectorFuzzer.h" -using namespace facebook::velox; -using namespace facebook::velox::exec; -using namespace facebook::velox::exec::test; +namespace facebook::velox::exec::test { +namespace { + +using facebook::velox::test::assertEqualVectors; class NestedLoopJoinTest : public HiveConnectorTestBase { protected: @@ -70,16 +71,27 @@ class NestedLoopJoinTest : public HiveConnectorTestBase { const std::vector& buildVectors) { runTest(probeVectors, buildVectors, 1); runTest(probeVectors, buildVectors, 4); + runTest( + probeVectors, + buildVectors, + 4, + 4); // Run with smaller output batch size. } void runTest( const std::vector& probeVectors, const std::vector& buildVectors, - int32_t numDrivers) { + int32_t numDrivers, + size_t preferredOutputBatchSize = 1024) { createDuckDbTable("t", probeVectors); createDuckDbTable("u", buildVectors); + auto queryCtx = core::QueryCtx::create(executor_.get()); CursorParameters params; + params.queryCtx = queryCtx; + params.queryCtx->testingOverrideConfigUnsafe( + {{core::QueryConfig::kPreferredOutputBatchRows, + std::to_string(preferredOutputBatchSize)}}); params.maxDrivers = numDrivers; auto planNodeIdGenerator = std::make_shared(); @@ -123,7 +135,8 @@ class NestedLoopJoinTest : public HiveConnectorTestBase { core::JoinType::kInner, core::JoinType::kLeft, core::JoinType::kRight, - core::JoinType::kFull}; + core::JoinType::kFull, + }; std::vector outputLayout_{probeKeyName_, buildKeyName_}; std::string joinConditionStr_{probeKeyName_ + " {} " + buildKeyName_}; std::string queryStr_{fmt::format( @@ -448,8 +461,8 @@ TEST_F(NestedLoopJoinTest, zeroColumnBuild) { } TEST_F(NestedLoopJoinTest, bigintArray) { - auto probeVectors = makeBatches(1000, 5, probeType_, pool_.get()); - auto buildVectors = makeBatches(900, 5, buildType_, pool_.get()); + auto probeVectors = makeBatches(100, 5, probeType_, pool_.get()); + auto buildVectors = makeBatches(90, 5, buildType_, pool_.get()); setComparisons({"="}); setJoinTypes({core::JoinType::kFull}); runSingleAndMultiDriverTest(probeVectors, buildVectors); @@ -488,3 +501,72 @@ TEST_F(NestedLoopJoinTest, allTypes) { "SELECT t0, u0 FROM t {0} JOIN u ON t.t0 {1} u0 AND t1 {1} u1 AND t2 {1} u2 AND t3 {1} u3 AND t4 {1} u4 AND t5 {1} u5 AND t6 {1} u6"); runSingleAndMultiDriverTest(probeVectors, buildVectors); } + +// Ensures output order follows the probe input order for inner and left joins. +TEST_F(NestedLoopJoinTest, outputOrder) { + auto probeVectors = makeRowVector( + {"l1", "l2"}, + { + makeNullableFlatVector({1, 8, 6, std::nullopt, 7, 4}), + makeFlatVector({"a", "b", "c", "d", "e", "f"}), + }); + auto buildVector1 = makeRowVector( + {"r1", "r2"}, + { + makeNullableFlatVector({4, 6, 1}), + makeFlatVector({"z", "x", "y"}), + }); + + auto buildVector2 = makeRowVector( + {"r1", "r2"}, + { + makeNullableFlatVector({10, std::nullopt, 6}), + makeFlatVector({"z", "p", "u"}), + }); + + const auto createPlan = [&](core::JoinType joinType) { + auto planNodeIdGenerator = std::make_shared(); + return PlanBuilder(planNodeIdGenerator) + .values({probeVectors}) + .nestedLoopJoin( + PlanBuilder(planNodeIdGenerator) + .values({buildVector1, buildVector2}) + .project({"r1", "r2"}) + .planNode(), + "l1 < r1", + {"l1", "l2", "r1", "r2"}, + joinType) + .planNode(); + }; + + // Inner. + auto results = AssertQueryBuilder(createPlan(core::JoinType::kInner)) + .copyResults(pool()); + auto expectedInner = makeRowVector({ + makeNullableFlatVector({1, 1, 1, 1, 8, 6, 7, 4, 4, 4}), + makeFlatVector( + {"a", "a", "a", "a", "b", "c", "e", "f", "f", "f"}), + makeNullableFlatVector({4, 6, 10, 6, 10, 10, 10, 6, 10, 6}), + makeFlatVector( + {"z", "x", "z", "u", "z", "z", "z", "x", "z", "u"}), + }); + assertEqualVectors(expectedInner, results); + + // Left. + results = + AssertQueryBuilder(createPlan(core::JoinType::kLeft)).copyResults(pool()); + auto expectedLeft = makeRowVector({ + makeNullableFlatVector( + {1, 1, 1, 1, 8, 6, std::nullopt, 7, 4, 4, 4}), + makeNullableFlatVector( + {"a", "a", "a", "a", "b", "c", "d", "e", "f", "f", "f"}), + makeNullableFlatVector( + {4, 6, 10, 6, 10, 10, std::nullopt, 10, 6, 10, 6}), + makeNullableFlatVector( + {"z", "x", "z", "u", "z", "z", std::nullopt, "z", "x", "z", "u"}), + }); + assertEqualVectors(expectedLeft, results); +} + +} // namespace +} // namespace facebook::velox::exec::test