From def39768547461ede18233b430f17e7bbfa02c0f Mon Sep 17 00:00:00 2001 From: Ke Jia Date: Wed, 11 Jun 2025 17:23:13 +0800 Subject: [PATCH 1/5] Fix the semi join result mismatch issue with filter and multi duplicated rows --- velox/exec/MergeJoin.cpp | 54 +++++++++++++++++++----------- velox/exec/MergeJoin.h | 16 +++++++-- velox/exec/tests/MergeJoinTest.cpp | 52 ++++++++++++++++++++++++++++ 3 files changed, 100 insertions(+), 22 deletions(-) diff --git a/velox/exec/MergeJoin.cpp b/velox/exec/MergeJoin.cpp index 550c059cc81f..6ef1b6047eb6 100644 --- a/velox/exec/MergeJoin.cpp +++ b/velox/exec/MergeJoin.cpp @@ -90,7 +90,9 @@ void MergeJoin::initialize() { initializeFilter(joinNode_->filter(), leftType, rightType); if (joinNode_->isLeftJoin() || joinNode_->isAntiJoin() || - joinNode_->isRightJoin() || joinNode_->isFullJoin()) { + joinNode_->isRightJoin() || joinNode_->isFullJoin() || + joinNode_->isLeftSemiFilterJoin() || + joinNode_->isRightSemiFilterJoin()) { joinTracker_ = JoinTracker(outputBatchSize_, pool()); } } else if (joinNode_->isAntiJoin()) { @@ -421,7 +423,7 @@ bool MergeJoin::tryAddOutputRow( filterRightInputProjections_); if (joinTracker_) { - if (isRightJoin(joinType_)) { + if (isRightJoin(joinType_) || isRightSemiFilterJoin(joinType_)) { // Record right-side row with a match on the left-side. joinTracker_->addMatch(rightBatch, rightRow, outputSize_); } else { @@ -613,8 +615,9 @@ bool MergeJoin::addToOutputForLeftJoin() { // one match on the other side, we could explore specialized algorithms // or data structures that short-circuit the join process once a match // is found. - for (size_t r = isLeftSemiFilterJoin(joinType_) ? numRightBatches - 1 - : firstRightBatch; + for (size_t r = (isLeftSemiFilterJoin(joinType_) && !filter_) + ? numRightBatches - 1 + : firstRightBatch; r < numRightBatches; ++r) { const auto rightBatch = rightMatch_->inputs[r]; @@ -622,7 +625,7 @@ bool MergeJoin::addToOutputForLeftJoin() { const auto rightEndRow = r == numRightBatches - 1 ? rightMatch_->endRowIndex : rightBatch->size(); - if (isLeftSemiFilterJoin(joinType_)) { + if (isLeftSemiFilterJoin(joinType_) && !filter_) { rightStartRow = rightEndRow - 1; } if (prepareOutput(leftBatch, rightBatch)) { @@ -693,8 +696,9 @@ bool MergeJoin::addToOutputForRightJoin() { // one match on the other side, we could explore specialized algorithms // or data structures that short-circuit the join process once a match // is found. - for (size_t l = isRightSemiFilterJoin(joinType_) ? numLeftBatches - 1 - : firstLeftBatch; + for (size_t l = (isRightSemiFilterJoin(joinType_) && !filter_) + ? numLeftBatches - 1 + : firstLeftBatch; l < numLeftBatches; ++l) { const auto leftBatch = leftMatch_->inputs[l]; @@ -702,7 +706,7 @@ bool MergeJoin::addToOutputForRightJoin() { const auto leftEndRow = l == numLeftBatches - 1 ? leftMatch_->endRowIndex : leftBatch->size(); - if (isRightSemiFilterJoin(joinType_)) { + if (isRightSemiFilterJoin(joinType_) && !filter_) { // RightSemiFilter produce each row from the right at most once. leftStartRow = leftEndRow - 1; } @@ -818,7 +822,7 @@ RowVectorPtr MergeJoin::getOutput() { continue; } else if (isAntiJoin(joinType_)) { output = filterOutputForAntiJoin(output); - if (output) { + if (output != nullptr && output->size() > 0) { return output; } @@ -1274,7 +1278,8 @@ RowVectorPtr MergeJoin::applyFilter(const RowVectorPtr& output) { // If all matches for a given left-side row fail the filter, add a row to // the output with nulls for the right-side columns. const auto onMiss = [&](auto row) { - if (isAntiJoin(joinType_)) { + if (isAntiJoin(joinType_) || isLeftSemiFilterJoin(joinType_) || + isRightSemiFilterJoin(joinType_)) { return; } rawIndices[numPassed++] = row; @@ -1346,18 +1351,26 @@ RowVectorPtr MergeJoin::applyFilter(const RowVectorPtr& output) { } }; + auto onMatch = [&](auto row) { + if (isLeftSemiFilterJoin(joinType_) || isRightSemiFilterJoin(joinType_)) { + rawIndices[numPassed++] = row; + } + }; + for (auto i = 0; i < numRows; ++i) { if (filterRows.isValid(i)) { const bool passed = !decodedFilterResult_.isNullAt(i) && decodedFilterResult_.valueAt(i); - joinTracker_->processFilterResult(i, passed, onMiss); + joinTracker_->processFilterResult(i, passed, onMiss, onMatch); if (isAntiJoin(joinType_)) { if (!passed) { rawIndices[numPassed++] = i; } - } else { + } else if ( + !isLeftSemiFilterJoin(joinType_) && + !isRightSemiFilterJoin(joinType_)) { if (passed) { rawIndices[numPassed++] = i; } @@ -1371,19 +1384,19 @@ RowVectorPtr MergeJoin::applyFilter(const RowVectorPtr& output) { // Every time we start a new left key match, `processFilterResult()` will // check if at least one row from the previous match passed the filter. If - // none did, it calls onMiss to add a record with null right projections to - // the output. + // none did, it calls onMiss to add a record with null right projections + // to the output. // // Before we leave the current buffer, since we may not have seen the next - // left key match yet, the last key match may still be pending to produce a - // row (because `processFilterResult()` was not called yet). + // left key match yet, the last key match may still be pending to produce + // a row (because `processFilterResult()` was not called yet). // // To handle this, we need to call `noMoreFilterResults()` unless the - // same current left key match may continue in the next buffer. So there are - // two cases to check: + // same current left key match may continue in the next buffer. So there + // are two cases to check: // - // 1. If leftMatch_ is nullopt, there for sure the next buffer will contain - // a different key match. + // 1. If leftMatch_ is nullopt, there for sure the next buffer will + // contain a different key match. // // 2. leftMatch_ may not be nullopt, but may be related to a different // (subsequent) left key. So we check if the last row in the batch has the @@ -1391,6 +1404,7 @@ RowVectorPtr MergeJoin::applyFilter(const RowVectorPtr& output) { if (!leftMatch_ || !joinTracker_->isCurrentLeftMatch(numRows - 1)) { joinTracker_->noMoreFilterResults(onMiss); } + } else { filterRows_.resize(numRows); filterRows_.setAll(); diff --git a/velox/exec/MergeJoin.h b/velox/exec/MergeJoin.h index 47dea482554d..f0dd11fcfd27 100644 --- a/velox/exec/MergeJoin.h +++ b/velox/exec/MergeJoin.h @@ -394,11 +394,12 @@ class MergeJoin : public Operator { // rows that correspond to a single left-side row. Use // 'noMoreFilterResults' to make sure 'onMiss' is called for the last // left-side row. - template + template void processFilterResult( vector_size_t outputIndex, bool passed, - TOnMiss onMiss) { + TOnMiss onMiss, + TOnMatch onMatch) { const auto rowNumber = rawLeftRowNumbers_[outputIndex]; if (currentLeftRowNumber_ != rowNumber) { if (currentRow_ != -1 && !currentRowPassed_) { @@ -407,12 +408,18 @@ class MergeJoin : public Operator { currentRow_ = outputIndex; currentLeftRowNumber_ = rowNumber; currentRowPassed_ = false; + firstMatched_ = false; } else { currentRow_ = outputIndex; } if (passed) { currentRowPassed_ = true; + + if (!firstMatched_) { + onMatch(outputIndex); + firstMatched_ = true; + } } } @@ -434,6 +441,7 @@ class MergeJoin : public Operator { currentRow_ = -1; currentRowPassed_ = false; + firstMatched_ = false; } void reset(); @@ -470,6 +478,10 @@ class MergeJoin : public Operator { // True if at least one row in a block of output rows corresponding a single // left-side row identified by 'currentRowNumber' passed the filter. bool currentRowPassed_{false}; + + // Retains only the first matching record for a semi join in scenarios + // involving filters. + bool firstMatched_{false}; }; /// Used to record both left and right join. diff --git a/velox/exec/tests/MergeJoinTest.cpp b/velox/exec/tests/MergeJoinTest.cpp index 5714301a2f3b..c31d676c5e0a 100644 --- a/velox/exec/tests/MergeJoinTest.cpp +++ b/velox/exec/tests/MergeJoinTest.cpp @@ -1016,6 +1016,58 @@ TEST_F(MergeJoinTest, semiJoinWithMultipleMatchVectors) { core::JoinType::kLeftSemiFilter); } +TEST_F(MergeJoinTest, semiJoinWithMultiMatchedRowsWithFilter) { + auto left = makeRowVector( + {"t0", "t1"}, + {makeNullableFlatVector({2, 2, 2, 2, 2}), + makeNullableFlatVector({3, 2, 3, 2, 2})}); + + auto right = makeRowVector( + {"u0", "u1"}, + {makeNullableFlatVector({2, 2, 2, 2, 2, 2}), + makeNullableFlatVector({2, 2, 2, 2, 2, 4})}); + + createDuckDbTable("t", {left}); + createDuckDbTable("u", {right}); + + auto testSemiJoin = [&](const std::string& filter, + const std::string& sql, + const std::vector& outputLayout, + core::JoinType joinType) { + auto planNodeIdGenerator = std::make_shared(); + auto plan = PlanBuilder(planNodeIdGenerator) + .values(split(left, 2)) + .mergeJoin( + {"t0"}, + {"u0"}, + PlanBuilder(planNodeIdGenerator) + .values(split(right, 2)) + .planNode(), + filter, + outputLayout, + joinType) + .planNode(); + AssertQueryBuilder(plan, duckDbQueryRunner_) + .config(core::QueryConfig::kPreferredOutputBatchRows, "2") + .config(core::QueryConfig::kMaxOutputBatchRows, "2") + .assertResults(sql); + }; + + // Left Semi join With filter + testSemiJoin( + "t1 > u1", + "SELECT t0, t1 FROM t where t0 IN (SELECT u0 from u where t1 > u1)", + {"t0", "t1"}, + core::JoinType::kLeftSemiFilter); + + // Right Semi join With filter + testSemiJoin( + "u1 > t1", + "SELECT u0, u1 FROM u where u0 IN (SELECT t0 from t where u1 > t1)", + {"u0", "u1"}, + core::JoinType::kRightSemiFilter); +} + TEST_F(MergeJoinTest, rightJoin) { auto left = makeRowVector( {"t0"}, From d696d3a45910c8af4e6bde210ef1392e06fc6a0e Mon Sep 17 00:00:00 2001 From: Ke Jia Date: Mon, 23 Jun 2025 22:08:20 +0800 Subject: [PATCH 2/5] Resolve comments --- velox/exec/MergeJoin.cpp | 8 +++++--- velox/exec/MergeJoin.h | 9 +++------ 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/velox/exec/MergeJoin.cpp b/velox/exec/MergeJoin.cpp index 6ef1b6047eb6..9c42ce87ca06 100644 --- a/velox/exec/MergeJoin.cpp +++ b/velox/exec/MergeJoin.cpp @@ -1351,9 +1351,12 @@ RowVectorPtr MergeJoin::applyFilter(const RowVectorPtr& output) { } }; - auto onMatch = [&](auto row) { - if (isLeftSemiFilterJoin(joinType_) || isRightSemiFilterJoin(joinType_)) { + auto onMatch = [&](auto row, bool& firstMatched_) { + if ((isLeftSemiFilterJoin(joinType_) || + isRightSemiFilterJoin(joinType_)) && + !firstMatched_) { rawIndices[numPassed++] = row; + firstMatched_ = true; } }; @@ -1404,7 +1407,6 @@ RowVectorPtr MergeJoin::applyFilter(const RowVectorPtr& output) { if (!leftMatch_ || !joinTracker_->isCurrentLeftMatch(numRows - 1)) { joinTracker_->noMoreFilterResults(onMiss); } - } else { filterRows_.resize(numRows); filterRows_.setAll(); diff --git a/velox/exec/MergeJoin.h b/velox/exec/MergeJoin.h index f0dd11fcfd27..c20b710da4d6 100644 --- a/velox/exec/MergeJoin.h +++ b/velox/exec/MergeJoin.h @@ -398,8 +398,8 @@ class MergeJoin : public Operator { void processFilterResult( vector_size_t outputIndex, bool passed, - TOnMiss onMiss, - TOnMatch onMatch) { + const TOnMiss& onMiss, + const TOnMatch& onMatch) { const auto rowNumber = rawLeftRowNumbers_[outputIndex]; if (currentLeftRowNumber_ != rowNumber) { if (currentRow_ != -1 && !currentRowPassed_) { @@ -416,10 +416,7 @@ class MergeJoin : public Operator { if (passed) { currentRowPassed_ = true; - if (!firstMatched_) { - onMatch(outputIndex); - firstMatched_ = true; - } + onMatch(outputIndex, firstMatched_); } } From 2b205175d85b1150155d99356da4de45b827d852 Mon Sep 17 00:00:00 2001 From: Ke Jia Date: Tue, 24 Jun 2025 17:24:40 +0800 Subject: [PATCH 3/5] Resolve comments --- velox/exec/MergeJoin.cpp | 17 ++++++----------- velox/exec/MergeJoin.h | 2 +- 2 files changed, 7 insertions(+), 12 deletions(-) diff --git a/velox/exec/MergeJoin.cpp b/velox/exec/MergeJoin.cpp index 9c42ce87ca06..df743ad39a7d 100644 --- a/velox/exec/MergeJoin.cpp +++ b/velox/exec/MergeJoin.cpp @@ -1351,12 +1351,13 @@ RowVectorPtr MergeJoin::applyFilter(const RowVectorPtr& output) { } }; - auto onMatch = [&](auto row, bool& firstMatched_) { - if ((isLeftSemiFilterJoin(joinType_) || - isRightSemiFilterJoin(joinType_)) && - !firstMatched_) { + auto onMatch = [&](auto row, bool firstMatched_) { + bool isSemiJoin = + isLeftSemiFilterJoin(joinType_) || isRightSemiFilterJoin(joinType_); + bool isNonSemiAntiJoin = !isSemiJoin && !isAntiJoin(joinType_); + + if ((isSemiJoin && !firstMatched_) || isNonSemiAntiJoin) { rawIndices[numPassed++] = row; - firstMatched_ = true; } }; @@ -1371,12 +1372,6 @@ RowVectorPtr MergeJoin::applyFilter(const RowVectorPtr& output) { if (!passed) { rawIndices[numPassed++] = i; } - } else if ( - !isLeftSemiFilterJoin(joinType_) && - !isRightSemiFilterJoin(joinType_)) { - if (passed) { - rawIndices[numPassed++] = i; - } } } else { // This row doesn't have a match on the right side. Keep it diff --git a/velox/exec/MergeJoin.h b/velox/exec/MergeJoin.h index c20b710da4d6..413f63610e9d 100644 --- a/velox/exec/MergeJoin.h +++ b/velox/exec/MergeJoin.h @@ -415,8 +415,8 @@ class MergeJoin : public Operator { if (passed) { currentRowPassed_ = true; - onMatch(outputIndex, firstMatched_); + firstMatched_ = true; } } From 1dd625f3d23622560e82adbdc9ad72e062083dcc Mon Sep 17 00:00:00 2001 From: Ke Jia Date: Tue, 24 Jun 2025 19:44:11 +0800 Subject: [PATCH 4/5] Resolve comments --- velox/exec/MergeJoin.cpp | 16 +++++---- velox/exec/MergeJoin.h | 9 +----- velox/exec/tests/MergeJoinTest.cpp | 52 ++++++++++++++++++++++++++++++ 3 files changed, 62 insertions(+), 15 deletions(-) diff --git a/velox/exec/MergeJoin.cpp b/velox/exec/MergeJoin.cpp index df743ad39a7d..2799b1342586 100644 --- a/velox/exec/MergeJoin.cpp +++ b/velox/exec/MergeJoin.cpp @@ -289,6 +289,10 @@ void copyRow( targetChild->copy(sourceChild.get(), targetIndex, sourceIndex, 1); } } + +bool isSemiFilterJoin(core::JoinType joinType) { + return isLeftSemiFilterJoin(joinType) || isRightSemiFilterJoin(joinType); +} } // namespace inline void addNull( @@ -1278,8 +1282,7 @@ RowVectorPtr MergeJoin::applyFilter(const RowVectorPtr& output) { // If all matches for a given left-side row fail the filter, add a row to // the output with nulls for the right-side columns. const auto onMiss = [&](auto row) { - if (isAntiJoin(joinType_) || isLeftSemiFilterJoin(joinType_) || - isRightSemiFilterJoin(joinType_)) { + if (isAntiJoin(joinType_) || isSemiFilterJoin(joinType_)) { return; } rawIndices[numPassed++] = row; @@ -1351,12 +1354,11 @@ RowVectorPtr MergeJoin::applyFilter(const RowVectorPtr& output) { } }; - auto onMatch = [&](auto row, bool firstMatched_) { - bool isSemiJoin = - isLeftSemiFilterJoin(joinType_) || isRightSemiFilterJoin(joinType_); - bool isNonSemiAntiJoin = !isSemiJoin && !isAntiJoin(joinType_); + auto onMatch = [&](auto row, bool hasMatched_) { + const bool isNonSemiAntiJoin = + !isSemiFilterJoin(joinType_) && !isAntiJoin(joinType_); - if ((isSemiJoin && !firstMatched_) || isNonSemiAntiJoin) { + if ((isSemiFilterJoin(joinType_) && hasMatched_) || isNonSemiAntiJoin) { rawIndices[numPassed++] = row; } }; diff --git a/velox/exec/MergeJoin.h b/velox/exec/MergeJoin.h index 413f63610e9d..1f4d5d6d3988 100644 --- a/velox/exec/MergeJoin.h +++ b/velox/exec/MergeJoin.h @@ -408,15 +408,13 @@ class MergeJoin : public Operator { currentRow_ = outputIndex; currentLeftRowNumber_ = rowNumber; currentRowPassed_ = false; - firstMatched_ = false; } else { currentRow_ = outputIndex; } if (passed) { + onMatch(outputIndex, !currentRowPassed_); currentRowPassed_ = true; - onMatch(outputIndex, firstMatched_); - firstMatched_ = true; } } @@ -438,7 +436,6 @@ class MergeJoin : public Operator { currentRow_ = -1; currentRowPassed_ = false; - firstMatched_ = false; } void reset(); @@ -475,10 +472,6 @@ class MergeJoin : public Operator { // True if at least one row in a block of output rows corresponding a single // left-side row identified by 'currentRowNumber' passed the filter. bool currentRowPassed_{false}; - - // Retains only the first matching record for a semi join in scenarios - // involving filters. - bool firstMatched_{false}; }; /// Used to record both left and right join. diff --git a/velox/exec/tests/MergeJoinTest.cpp b/velox/exec/tests/MergeJoinTest.cpp index c31d676c5e0a..a76dbfe4b6d7 100644 --- a/velox/exec/tests/MergeJoinTest.cpp +++ b/velox/exec/tests/MergeJoinTest.cpp @@ -1068,6 +1068,58 @@ TEST_F(MergeJoinTest, semiJoinWithMultiMatchedRowsWithFilter) { core::JoinType::kRightSemiFilter); } +TEST_F(MergeJoinTest, semiJoinWithOneMatchedRowWithFilter) { + auto left = makeRowVector( + {"t0", "t1"}, + {makeNullableFlatVector({2, 2}), + makeNullableFlatVector({3, 5})}); + + auto right = makeRowVector( + {"u0", "u1"}, + {makeNullableFlatVector({2, 2}), + makeNullableFlatVector({1, 4})}); + + createDuckDbTable("t", {left}); + createDuckDbTable("u", {right}); + + auto testSemiJoin = [&](const std::string& filter, + const std::string& sql, + const std::vector& outputLayout, + core::JoinType joinType) { + auto planNodeIdGenerator = std::make_shared(); + auto plan = PlanBuilder(planNodeIdGenerator) + .values(split(left, 2)) + .mergeJoin( + {"t0"}, + {"u0"}, + PlanBuilder(planNodeIdGenerator) + .values(split(right, 2)) + .planNode(), + filter, + outputLayout, + joinType) + .planNode(); + AssertQueryBuilder(plan, duckDbQueryRunner_) + .config(core::QueryConfig::kPreferredOutputBatchRows, "2") + .config(core::QueryConfig::kMaxOutputBatchRows, "2") + .assertResults(sql); + }; + + // Left Semi join With filter + testSemiJoin( + "t1 > u1", + "SELECT t0, t1 FROM t where t0 IN (SELECT u0 from u where t1 > u1)", + {"t0", "t1"}, + core::JoinType::kLeftSemiFilter); + + // Right Semi join With filter + testSemiJoin( + "u1 > t1", + "SELECT u0, u1 FROM u where u0 IN (SELECT t0 from t where u1 > t1)", + {"u0", "u1"}, + core::JoinType::kRightSemiFilter); +} + TEST_F(MergeJoinTest, rightJoin) { auto left = makeRowVector( {"t0"}, From 04240a294c447ea0c7d9b5bb96fb8b6741f874b2 Mon Sep 17 00:00:00 2001 From: Ke Jia Date: Tue, 24 Jun 2025 20:21:03 +0800 Subject: [PATCH 5/5] Resolve comments --- velox/exec/MergeJoin.cpp | 45 ++++++++++++++++++++-------------------- velox/exec/MergeJoin.h | 2 +- 2 files changed, 23 insertions(+), 24 deletions(-) diff --git a/velox/exec/MergeJoin.cpp b/velox/exec/MergeJoin.cpp index 2799b1342586..ceaad22dfed4 100644 --- a/velox/exec/MergeJoin.cpp +++ b/velox/exec/MergeJoin.cpp @@ -20,6 +20,25 @@ namespace facebook::velox::exec { +namespace { +void copyRow( + const RowVectorPtr& source, + vector_size_t sourceIndex, + const RowVectorPtr& target, + vector_size_t targetIndex, + const std::vector& projections) { + for (const auto& projection : projections) { + const auto& sourceChild = source->childAt(projection.inputChannel); + const auto& targetChild = target->childAt(projection.outputChannel); + targetChild->copy(sourceChild.get(), targetIndex, sourceIndex, 1); + } +} + +bool isSemiFilterJoin(core::JoinType joinType) { + return isLeftSemiFilterJoin(joinType) || isRightSemiFilterJoin(joinType); +} +} // namespace + MergeJoin::MergeJoin( int32_t operatorId, DriverCtx* driverCtx, @@ -91,8 +110,7 @@ void MergeJoin::initialize() { if (joinNode_->isLeftJoin() || joinNode_->isAntiJoin() || joinNode_->isRightJoin() || joinNode_->isFullJoin() || - joinNode_->isLeftSemiFilterJoin() || - joinNode_->isRightSemiFilterJoin()) { + isSemiFilterJoin(joinType_)) { joinTracker_ = JoinTracker(outputBatchSize_, pool()); } } else if (joinNode_->isAntiJoin()) { @@ -276,25 +294,6 @@ bool MergeJoin::findEndOfMatch( return true; } -namespace { -void copyRow( - const RowVectorPtr& source, - vector_size_t sourceIndex, - const RowVectorPtr& target, - vector_size_t targetIndex, - const std::vector& projections) { - for (const auto& projection : projections) { - const auto& sourceChild = source->childAt(projection.inputChannel); - const auto& targetChild = target->childAt(projection.outputChannel); - targetChild->copy(sourceChild.get(), targetIndex, sourceIndex, 1); - } -} - -bool isSemiFilterJoin(core::JoinType joinType) { - return isLeftSemiFilterJoin(joinType) || isRightSemiFilterJoin(joinType); -} -} // namespace - inline void addNull( VectorPtr& target, vector_size_t index, @@ -1354,11 +1353,11 @@ RowVectorPtr MergeJoin::applyFilter(const RowVectorPtr& output) { } }; - auto onMatch = [&](auto row, bool hasMatched_) { + auto onMatch = [&](auto row, bool firstMatch) { const bool isNonSemiAntiJoin = !isSemiFilterJoin(joinType_) && !isAntiJoin(joinType_); - if ((isSemiFilterJoin(joinType_) && hasMatched_) || isNonSemiAntiJoin) { + if ((isSemiFilterJoin(joinType_) && firstMatch) || isNonSemiAntiJoin) { rawIndices[numPassed++] = row; } }; diff --git a/velox/exec/MergeJoin.h b/velox/exec/MergeJoin.h index 1f4d5d6d3988..5a44f92f8aff 100644 --- a/velox/exec/MergeJoin.h +++ b/velox/exec/MergeJoin.h @@ -413,7 +413,7 @@ class MergeJoin : public Operator { } if (passed) { - onMatch(outputIndex, !currentRowPassed_); + onMatch(outputIndex, /*firstMatch=*/!currentRowPassed_); currentRowPassed_ = true; } }