Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions velox/exec/MergeJoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -861,6 +861,34 @@ RowVectorPtr MergeJoin::getOutput() {
}
}

RowVectorPtr MergeJoin::handleRightSideNullRows() {
const auto rightFirstNonNullIndex =
firstNonNull(rightInput_, rightKeyChannels_);
if ((isRightJoin(joinType_) || isFullJoin(joinType_)) &&
rightFirstNonNullIndex > rightRowIndex_) {
if (prepareOutput(nullptr, rightInput_)) {
output_->resize(outputSize_);
return std::move(output_);
}
for (int i = rightRowIndex_; i < rightFirstNonNullIndex; ++i) {
if (!tryAddOutputRowForRightJoin()) {
rightRowIndex_ = i;
return std::move(output_);
}

if (finishedRightBatch()) {
// Ran out of rows on the right side.
rightInput_ = nullptr;
return nullptr;
}
}

rightRowIndex_ = rightFirstNonNullIndex;
}

return nullptr;
}

RowVectorPtr MergeJoin::doGetOutput() {
// Check if we ran out of space in the output vector in the middle of the
// match.
Expand Down Expand Up @@ -1060,6 +1088,12 @@ RowVectorPtr MergeJoin::doGetOutput() {
return nullptr;
}

const auto output = handleRightSideNullRows();
if (output != nullptr || rightInput_ == nullptr) {
return output;
}
VELOX_CHECK_NOT_NULL(rightInput_);

// Look for a new match starting with index_ row on the left and rightIndex_
// row on the right.
auto compareResult = compare();
Expand Down
4 changes: 4 additions & 0 deletions velox/exec/MergeJoin.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ class MergeJoin : public Operator {
const RowTypePtr& leftType,
const RowTypePtr& rightType);

// The handling of null rows on the right side for right and full type of
// joins.
RowVectorPtr handleRightSideNullRows();

RowVectorPtr doGetOutput();

static int32_t compare(
Expand Down
46 changes: 46 additions & 0 deletions velox/exec/tests/MergeJoinTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,52 @@ TEST_F(MergeJoinTest, rightJoinFilterWithNull) {
.assertResults("SELECT * from t RIGHT JOIN u ON a = c AND b < d");
}

TEST_F(MergeJoinTest, fisrtRowsNull) {
auto left = makeRowVector(
{"a", "b"},
{
makeNullableFlatVector<int32_t>({std::nullopt, 3}),
makeNullableFlatVector<double>({std::nullopt, 3}),
});

auto right = makeRowVector(
{"c", "d"},
{
makeNullableFlatVector<int32_t>({std::nullopt, std::nullopt, 3}),
makeNullableFlatVector<double>({std::nullopt, std::nullopt, 4}),
});

createDuckDbTable("t", {left});
createDuckDbTable("u", {right});

auto planNodeIdGenerator = std::make_shared<core::PlanNodeIdGenerator>();

auto plan = [&](core::JoinType type) {
return PlanBuilder(planNodeIdGenerator)
.values({left})
.mergeJoin(
{"a"},
{"c"},
PlanBuilder(planNodeIdGenerator).values({right}).planNode(),
"b < d",
{"a", "b", "c", "d"},
type)
.planNode();
};

// Right Join
AssertQueryBuilder(plan(core::JoinType::kRight), duckDbQueryRunner_)
.assertResults("SELECT * from t RIGHT JOIN u ON a = c AND b < d");

// Left Join
AssertQueryBuilder(plan(core::JoinType::kLeft), duckDbQueryRunner_)
.assertResults("SELECT * from t Left JOIN u ON a = c AND b < d");

// Inner Join
AssertQueryBuilder(plan(core::JoinType::kInner), duckDbQueryRunner_)
.assertResults("SELECT * from t, u where a = c AND b < d");
}

// Verify that both left-side and right-side pipelines feeding the merge join
// always run single-threaded.
TEST_F(MergeJoinTest, numDrivers) {
Expand Down
Loading