Skip to content

[WIP] Merge NestedLoopJoin Build vectors#10048

Closed
karteekmurthys wants to merge 1 commit intofacebookincubator:mainfrom
karteekmurthys:nestedloop-single-parse
Closed

[WIP] Merge NestedLoopJoin Build vectors#10048
karteekmurthys wants to merge 1 commit intofacebookincubator:mainfrom
karteekmurthys:nestedloop-single-parse

Conversation

@karteekmurthys
Copy link
Copy Markdown
Contributor

Fixes prestodb/presto#22585.

Current implementation of NestedLoopJoin produces ungrouped final results which breaks the StreamingAggregation, which expects input to the aggregate operator is pre-grouped. Based on this fact, the StreamingAgg dedups the rows.

Previous:

In NestedLoop Probe side if the input is

1, 1
2, 2

and build side you have 2 vectors

Vector 1 : 1
Vector 2: 2

The output is still going to be

1, 1, 1
2, 2, 1
1, 1, 2
2, 2, 2

which is ungrouped.

With this PR change, the above output would look like:

1, 1, 1
1, 1, 2
2, 2, 1
2, 2, 2

@facebook-github-bot facebook-github-bot added the CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. label Jun 4, 2024
@netlify
Copy link
Copy Markdown

netlify bot commented Jun 4, 2024

Deploy Preview for meta-velox ready!

Name Link
🔨 Latest commit 5999b9e
🔍 Latest deploy log https://app.netlify.com/sites/meta-velox/deploys/665f94733ed2700008ae79d3
😎 Deploy Preview https://deploy-preview-10048--meta-velox.netlify.app
📱 Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.

To edit notification comments on pull requests, go to your Netlify site configuration.

@Yuhta Yuhta requested a review from mbasmanova June 4, 2024 22:33
}

while (dataVectors_.size() > 1) {
dataVectors_[0]->append(dataVectors_[dataVectors_.size()-1].get());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to merge the vector, you just need to change the loop order in probe to do the probe side first

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@karteekmurthys : Agree with Jimmy. We need to change the probe loop to generate the indices differently.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One approach I tried was generating all the probe side indices for total build vector size (sum of all build vectors at the time of probing). Since we compute buildSize as sum of all build vectors, this also takes care of generating the build side indices. Now, all we need to handle is projections which expects a single RowVector for probe and build side. It seems like at this point we are forced to merge all our build vectors to be projected at once. That is why I chose to merge vectors in NestedLoopBuild side. Please CMIW there could be a better way to map indices to vectors.

  projectChildren(
      projectedChildren,
      buildVectors_.value()[buildIndex_], <- Single RowVector, but we need to project multiple into single output.
      buildProjections,
      numOutputRows,
      buildIndices_);

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Synced with @Yuhta will try another approach and update this PR.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline, merging build vectors help avoiding small output batches so might worth doing here. But BaseVector::append is not the right implementation here as it's too slow. Can you create a static method in BaseVector like this?

VectorPtr BaseVector::merge(const std::vector<VectorPtr>& vectors, memory::MemoryPool* pool) {
  VELOX_CHECK(!vector.empty());
  auto& type = vectors[0].type();
  vector_size_t size = 0;
  for (auto& vector : vectors) {
    VELOX_CHECK_EQ(*vector->type(), *type);
    size += vector->size();
  }
  auto result = create(type, size, pool);
  size = 0;
  for (auto& vector : vectors) {
    result->copy(vector.get(), size, 0, vector->size());
    size += vector->size();
  }  
  return result;
}

Copy link
Copy Markdown
Contributor

@Yuhta Yuhta Jun 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think about it again, merging will produce batches larger than we currently use, so have a risk memory-wise. @mbasmanova Do you have any opinion here? Will merging the build side in nested loop causing any potential issue?

Copy link
Copy Markdown
Collaborator

@aditi-pandit aditi-pandit Jun 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Yuhta, @mbasmanova : There is another option here. This grouping behavior from NestedLoopJoinNode is needed because of the use of StreamingAggregation in the original query.

We could change the LocalPlanner to use HashAggregation instead of the StreamingAggregation when used over a NestedLoopJoin instead for the correctness.

To elaborate, the original issue came from incorrect results in the following query:

SELECT count(*) FROM orders o WHERE EXISTS(SELECT 1 FROM orders i WHERE o.orderkey < i.orderkey AND i.orderkey % 1000 = 0);

count(*) needed aggregation and the subquery needed NestedLoopJoin in Prestissimo.

In Presto Java -- the same JoinNode translates to LookupJoin which imposes a sorting on the HashBuild side in this code https://github.com/prestodb/presto/blob/master/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java#L2436. So it justified the usage of StreamingAggregation.

We don't have an equivalent for such a LookupJoin in Velox and used NestedLoopJoin.

Since StreamingAggregation above NestedLoopJoin requires pre-grouping, it didn't give correct results.

As demonstrated in @karteekmurthys example

In NestedLoop Probe side if the input is

1, 1
2, 2

and build side you have 2 vectors

Vector 1 : 1
Vector 2: 2

The output is going to be

1, 1, 1
2, 2, 1
1, 1, 2
2, 2, 2

which is ungrouped.

This double counts the groups for both 1 and 2 keys.

We could change in local planning to use HashAggregation instead of StreamingAggregation above NestedLoopJoin to avoid the mis-counting.

wdyt ?

@amitkdutta

@karteekmurthys karteekmurthys self-assigned this Jun 4, 2024
// the length.
if (newSize > oldSize) {
VELOX_CHECK(child.unique(), "Resizing shared child vector");
// VELOX_CHECK(child.unique(), "Resizing shared child vector");
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why have you commented this line ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check makes sure if the vectors are shared we don't resize them. I will try an alternate approach like you and @Yuhta suggested.

@mbasmanova
Copy link
Copy Markdown
Contributor

@karteekmurthys @aditi-pandit Catching up on this issue. It sounds like you found that Presto optimizer assumed that NLJ produces output in a particular order. Is this the case? Would you share a pointer to that assumption? I remember seeing that Presto optimizer has some assumptions about the LEFT JOIN, but not sure about NLJ.

Before we proceed with the solution, let's investigate optimizer a bit more to come up with a full list of assumptions like this. Let's then create GitHub issue to describe this assumption and discuss how we can satisfy that in Velox.

@aditi-pandit
Copy link
Copy Markdown
Collaborator

aditi-pandit commented Jun 8, 2024

@karteekmurthys @aditi-pandit Catching up on this issue. It sounds like you found that Presto optimizer assumed that NLJ produces output in a particular order. Is this the case? Would you share a pointer to that assumption? I remember seeing that Presto optimizer has some assumptions about the LEFT JOIN, but not sure about NLJ.

Before we proceed with the solution, let's investigate optimizer a bit more to come up with a full list of assumptions like this. Let's then create GitHub issue to describe this assumption and discuss how we can satisfy that in Velox.

@mbasmanova : Well, Presto Java JoinNode translates to NestedLoopJoin only for cross join. This case with a LEFT OUTER with non-equal filter condition translates to a LookupJoin during physical planning in LocalExecutionPlanner for this case https://github.com/prestodb/presto/blob/master/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java#L1958... and the LookupJoinSourceFactory has means to access sortChannels https://github.com/prestodb/presto/blob/master/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java#L2390

The EXPLAIN plan snippet for LeftJoin also shows the SortExpression property

 - Aggregate(STREAMING)[orderkey, unique][PlanNodeId 263] => [orderkey:bigint, unique:bigint, count_21:bigint]                                                                                                            
    Estimates: {source: CostBasedSourceInfo, rows: ? (?), cpu: ?, memory: ?, network: 11.00} count_21 := "presto.default.count"((non_null))                                                                                                                                                                                                                                                                                        
     - LeftJoin[PlanNodeId 262][(orderkey) < (orderkey_0)] => [orderkey:bigint, unique:bigint, non_null:boolean]                                                                                                         
        Estimates: {source: CostBasedSourceInfo, rows: ? (?), cpu: ?, memory: 11.00, network: 11.00}                                                                                                                 
          Distribution: REPLICATED                                                                                                                                                                                     
          **SortExpression[orderkey_0]**                                                                                                                                                                                   
      - AssignUniqueId[PlanNodeId 261] => [orderkey:bigint, unique:bigint]             

There isn't an equivalent in Velox for LookupJoin unless you had ideas around some other ideas for it.

Prestissimo translates JoinNode to NestedLoopJoin here and there isn't any concept of sortChannels or preGrouping for it https://github.com/prestodb/presto/blob/master/presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp#L1122

The Velox plan fragment had been resolved to StreamingAggregation already which has pre-grouped Channels. That might be becasue of the sortExpression carried in the contexts.

@mbasmanova
Copy link
Copy Markdown
Contributor

@aditi-pandit Aditi, looks like the plan translation in Prestissimo has a bug. It ignores 'sortChannels' information in the join node incorrectly. An immediate fix would be to change the translation logic to fail to translate join nodes with sorting requirements. The next step would be to design a proper solution to support this use case.

@mbasmanova
Copy link
Copy Markdown
Contributor

We may need to study this PR prestodb/presto#8614 to understand this use case.

@aditi-pandit
Copy link
Copy Markdown
Collaborator

aditi-pandit commented Jun 10, 2024

@aditi-pandit Aditi, looks like the plan translation in Prestissimo has a bug. It ignores 'sortChannels' information in the join node incorrectly. An immediate fix would be to change the translation logic to fail to translate join nodes with sorting requirements. The next step would be to design a proper solution to support this use case.

@mbasmanova : The JoinNode serialized to Prestissimo doesn't have sortChannels field as far as I can see

struct JoinNode : public PlanNode {
  JoinType type = {};
  std::shared_ptr<PlanNode> left = {};
  std::shared_ptr<PlanNode> right = {};
  List<EquiJoinClause> criteria = {};
  List<VariableReferenceExpression> outputVariables = {};
  std::shared_ptr<std::shared_ptr<RowExpression>> filter = {};
  std::shared_ptr<VariableReferenceExpression> leftHashVariable = {};
  std::shared_ptr<VariableReferenceExpression> rightHashVariable = {};
  std::shared_ptr<JoinDistributionType> distributionType = {};
  Map<String, VariableReferenceExpression> dynamicFilters = {};

  JoinNode() noexcept;
};

Is it something else you are referring to ?

@karteekmurthys
Copy link
Copy Markdown
Contributor Author

We are not taking this approach of modifying nestedloop join as of now. We will avoid using pregrouped keys when source is nestedloop join, the fix is here: prestodb/presto#22998

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Native] - Correlated query returning incorrect results (Streaming agg assumes nested join produces pre-grouped results)

5 participants