-
Notifications
You must be signed in to change notification settings - Fork 3.6k
Support correlated subqueries for DELETE #9447
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,6 +25,7 @@ | |
| import io.trino.sql.planner.PlanNodeIdAllocator; | ||
| import io.trino.sql.planner.SymbolAllocator; | ||
| import io.trino.sql.planner.TypeProvider; | ||
| import io.trino.sql.planner.plan.AggregationNode; | ||
| import io.trino.sql.planner.plan.AssignUniqueId; | ||
| import io.trino.sql.planner.plan.DeleteNode; | ||
| import io.trino.sql.planner.plan.ExchangeNode; | ||
|
|
@@ -318,6 +319,9 @@ private TableHandle findTableScanHandleForDeleteOrUpdate(PlanNode node) | |
| if (node instanceof MarkDistinctNode) { | ||
| return findTableScanHandleForDeleteOrUpdate(((MarkDistinctNode) node).getSource()); | ||
| } | ||
| if (node instanceof AggregationNode) { | ||
| return findTableScanHandleForDeleteOrUpdate(((AggregationNode) node).getSource()); | ||
|
||
| } | ||
| throw new IllegalArgumentException("Invalid descendant for DeleteNode or UpdateNode: " + node.getClass().getName()); | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -13,21 +13,25 @@ | |
| */ | ||
| package io.trino.sql.planner.optimizations; | ||
|
|
||
| import com.google.common.collect.ImmutableList; | ||
| import io.trino.Session; | ||
| import io.trino.cost.TableStatsProvider; | ||
| import io.trino.execution.warnings.WarningCollector; | ||
| import io.trino.sql.planner.PlanNodeIdAllocator; | ||
| import io.trino.sql.planner.SymbolAllocator; | ||
| import io.trino.sql.planner.TypeProvider; | ||
| import io.trino.sql.planner.plan.DeleteNode; | ||
| import io.trino.sql.planner.plan.JoinNode; | ||
| import io.trino.sql.planner.plan.JoinNode.Type; | ||
| import io.trino.sql.planner.plan.PlanNode; | ||
| import io.trino.sql.planner.plan.SemiJoinNode; | ||
| import io.trino.sql.planner.plan.SimplePlanRewriter; | ||
|
|
||
| import static io.trino.sql.planner.plan.SemiJoinNode.DistributionType.REPLICATED; | ||
| import static io.trino.sql.planner.plan.JoinNode.Type.INNER; | ||
| import static io.trino.sql.planner.plan.JoinNode.Type.LEFT; | ||
| import static java.util.Objects.requireNonNull; | ||
|
|
||
| public class ReplicateSemiJoinInDelete | ||
| public class ReplicateJoinAndSemiJoinInDelete | ||
| implements PlanOptimizer | ||
| { | ||
| @Override | ||
|
|
@@ -42,26 +46,31 @@ private static class Rewriter | |
| { | ||
| private boolean isDeleteQuery; | ||
|
|
||
| @Override | ||
| public PlanNode visitJoin(JoinNode node, RewriteContext<Void> context) | ||
| { | ||
| PlanNode leftSourceRewritten = context.rewrite(node.getLeft(), context.get()); | ||
|
|
||
| // This should be applied to Joins directly between TableScan and DeleteNode, not to all Joins in the plan | ||
| JoinNode rewrittenNode = (JoinNode) node.replaceChildren(ImmutableList.of(leftSourceRewritten, node.getRight())); | ||
Praveen2112 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| Type joinType = rewrittenNode.getType(); | ||
| if (isDeleteQuery && (joinType == INNER || joinType == LEFT)) { | ||
|
||
| return rewrittenNode.withDistributionType(JoinNode.DistributionType.REPLICATED); | ||
| } | ||
|
|
||
| return rewrittenNode; | ||
| } | ||
|
|
||
| @Override | ||
| public PlanNode visitSemiJoin(SemiJoinNode node, RewriteContext<Void> context) | ||
| { | ||
| PlanNode sourceRewritten = context.rewrite(node.getSource(), context.get()); | ||
| PlanNode filteringSourceRewritten = context.rewrite(node.getFilteringSource(), context.get()); | ||
|
|
||
| SemiJoinNode rewrittenNode = new SemiJoinNode( | ||
| node.getId(), | ||
| sourceRewritten, | ||
| filteringSourceRewritten, | ||
| node.getSourceJoinSymbol(), | ||
| node.getFilteringSourceJoinSymbol(), | ||
| node.getSemiJoinOutput(), | ||
| node.getSourceHashSymbol(), | ||
| node.getFilteringSourceHashSymbol(), | ||
| node.getDistributionType(), | ||
| node.getDynamicFilterId()); | ||
| SemiJoinNode rewrittenNode = (SemiJoinNode) node.replaceChildren(ImmutableList.of(sourceRewritten, filteringSourceRewritten)); | ||
|
|
||
| if (isDeleteQuery) { | ||
| return rewrittenNode.withDistributionType(REPLICATED); | ||
| return rewrittenNode.withDistributionType(SemiJoinNode.DistributionType.REPLICATED); | ||
| } | ||
|
|
||
| return rewrittenNode; | ||
|
|
@@ -74,12 +83,7 @@ public PlanNode visitDelete(DeleteNode node, RewriteContext<Void> context) | |
| // so you can't do a distributed semi-join | ||
| isDeleteQuery = true; | ||
| PlanNode rewrittenSource = context.rewrite(node.getSource()); | ||
findepi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| return new DeleteNode( | ||
| node.getId(), | ||
| rewrittenSource, | ||
| node.getTarget(), | ||
| node.getRowId(), | ||
| node.getOutputSymbols()); | ||
| return node.replaceChildren(ImmutableList.of(rewrittenSource)); | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand why ReplicateSemiJoinInDelete/ReplicateJoinAndSemiJoinInDelete needs to be run before ReorderJoins.
I think ReplicateSemiJoinInDelete/ReplicateJoinAndSemiJoinInDelete would undo distirbution type picked by ReorderJoins. What am i missing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since ReorderJoins can flip the JoinNode - we would like to avoid it for the PlanNode between Delete and TableScan. So we run them before applying these optimizer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not a good way to solve this issue. Optimizer rules need to be able to run regardless of which order they are applied (i.e., correctness of the plan should not depend on the order of rule application -- only query performance should be affected).
How is this PR different from what we did for #8286 ?