Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions velox/exec/NestedLoopJoinBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ void NestedLoopJoinBuild::noMoreInput() {
}
}

while (dataVectors_.size() > 1) {
dataVectors_[0]->append(dataVectors_[dataVectors_.size()-1].get());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to merge the vector, you just need to change the loop order in probe to do the probe side first

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@karteekmurthys : Agree with Jimmy. We need to change the probe loop to generate the indices differently.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One approach I tried was generating all the probe side indices for total build vector size (sum of all build vectors at the time of probing). Since we compute buildSize as sum of all build vectors, this also takes care of generating the build side indices. Now, all we need to handle is projections which expects a single RowVector for probe and build side. It seems like at this point we are forced to merge all our build vectors to be projected at once. That is why I chose to merge vectors in NestedLoopBuild side. Please CMIW there could be a better way to map indices to vectors.

  projectChildren(
      projectedChildren,
      buildVectors_.value()[buildIndex_], <- Single RowVector, but we need to project multiple into single output.
      buildProjections,
      numOutputRows,
      buildIndices_);

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Synced with @Yuhta will try another approach and update this PR.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline, merging build vectors help avoiding small output batches so might worth doing here. But BaseVector::append is not the right implementation here as it's too slow. Can you create a static method in BaseVector like this?

VectorPtr BaseVector::merge(const std::vector<VectorPtr>& vectors, memory::MemoryPool* pool) {
  VELOX_CHECK(!vector.empty());
  auto& type = vectors[0].type();
  vector_size_t size = 0;
  for (auto& vector : vectors) {
    VELOX_CHECK_EQ(*vector->type(), *type);
    size += vector->size();
  }
  auto result = create(type, size, pool);
  size = 0;
  for (auto& vector : vectors) {
    result->copy(vector.get(), size, 0, vector->size());
    size += vector->size();
  }  
  return result;
}

Copy link
Copy Markdown
Contributor

@Yuhta Yuhta Jun 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think about it again, merging will produce batches larger than we currently use, so have a risk memory-wise. @mbasmanova Do you have any opinion here? Will merging the build side in nested loop causing any potential issue?

Copy link
Copy Markdown
Collaborator

@aditi-pandit aditi-pandit Jun 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Yuhta, @mbasmanova : There is another option here. This grouping behavior from NestedLoopJoinNode is needed because of the use of StreamingAggregation in the original query.

We could change the LocalPlanner to use HashAggregation instead of the StreamingAggregation when used over a NestedLoopJoin instead for the correctness.

To elaborate, the original issue came from incorrect results in the following query:

SELECT count(*) FROM orders o WHERE EXISTS(SELECT 1 FROM orders i WHERE o.orderkey < i.orderkey AND i.orderkey % 1000 = 0);

count(*) needed aggregation and the subquery needed NestedLoopJoin in Prestissimo.

In Presto Java -- the same JoinNode translates to LookupJoin which imposes a sorting on the HashBuild side in this code https://github.com/prestodb/presto/blob/master/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java#L2436. So it justified the usage of StreamingAggregation.

We don't have an equivalent for such a LookupJoin in Velox and used NestedLoopJoin.

Since StreamingAggregation above NestedLoopJoin requires pre-grouping, it didn't give correct results.

As demonstrated in @karteekmurthys example

In NestedLoop Probe side if the input is

1, 1
2, 2

and build side you have 2 vectors

Vector 1 : 1
Vector 2: 2

The output is going to be

1, 1, 1
2, 2, 1
1, 1, 2
2, 2, 2

which is ungrouped.

This double counts the groups for both 1 and 2 keys.

We could change in local planning to use HashAggregation instead of StreamingAggregation above NestedLoopJoin to avoid the mis-counting.

wdyt ?

@amitkdutta

dataVectors_.pop_back();
}
operatorCtx_->task()
->getNestedLoopJoinBridge(
operatorCtx_->driverCtx()->splitGroupId, planNodeId())
Expand Down
42 changes: 42 additions & 0 deletions velox/exec/tests/NestedLoopJoinTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -488,3 +488,45 @@ TEST_F(NestedLoopJoinTest, allTypes) {
"SELECT t0, u0 FROM t {0} JOIN u ON t.t0 {1} u0 AND t1 {1} u1 AND t2 {1} u2 AND t3 {1} u3 AND t4 {1} u4 AND t5 {1} u5 AND t6 {1} u6");
runSingleAndMultiDriverTest(probeVectors, buildVectors);
}

TEST_F(NestedLoopJoinTest, testCrossProduct) {
RowTypePtr probeType = ROW({{"t0", BIGINT()}});
RowTypePtr buildType = ROW({{"u0", BIGINT()}});

auto probeBatch = makeRowVector({"t0"}, {makeFlatVector<int64_t>({1,2,3,4,5})});
auto buildBatch = makeRowVector({"u0"}, {makeFlatVector<int64_t>({1,2})});

/*
* Probe Build
* 1 1 : Batch1
* 2 2
* 3 ---
* 4 1 : Batch2
* 5 2
*/
auto probeVectors = std::vector<RowVectorPtr>{probeBatch};
auto buildVectors = std::vector<RowVectorPtr>{buildBatch, buildBatch, buildBatch}; // 2 batches

setProbeType(probeType);
setBuildType(buildType);
setQueryStr("select t0, u0 from t,u"); // cross product.
auto planNodeIdGenerator = std::make_shared<core::PlanNodeIdGenerator>();
auto planNode =
PlanBuilder(planNodeIdGenerator)
.values(probeVectors)
.nestedLoopJoin(
PlanBuilder(planNodeIdGenerator)
.values(buildVectors)
.localPartition({buildKeyName_})
.planNode(),
"",
outputLayout_,
facebook::velox::core::JoinType::kFull)
.planNode();
auto expected = makeRowVector({makeFlatVector<int64_t>(
{1,1,1,1,1,1,2,2,2,2,2,2,3,3,3,3,3,3,
4,4,4,4,4,4,5,5,5,5,5,5}),
makeFlatVector<int64_t>({1,2,1,2,1,2,1,2,1,2,1,2,1,2,1,2,1,2,1,2,
1,2,1,2,1,2,1,2,1,2,1,2,1,2,1,2,1,2,1,2})});
assertQuery(planNode, expected);
}
2 changes: 1 addition & 1 deletion velox/vector/ComplexVector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,7 @@ void RowVector::resize(vector_size_t newSize, bool setNotNull) {
// to skip uniqueness check since effectively we are just changing
// the length.
if (newSize > oldSize) {
VELOX_CHECK(child.unique(), "Resizing shared child vector");
// VELOX_CHECK(child.unique(), "Resizing shared child vector");
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why have you commented this line ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check makes sure if the vectors are shared we don't resize them. I will try an alternate approach like you and @Yuhta suggested.

child->resize(newSize, setNotNull);
}
}
Expand Down