feat(optimizer): Add PushSemiJoinThroughUnion iterative rule#27176
feat(optimizer): Add PushSemiJoinThroughUnion iterative rule#27176feilong-liu merged 1 commit intoprestodb:masterfrom
Conversation
Reviewer's GuideIntroduces a new iterative planner rule PushSemiJoinThroughUnion, wires it into the optimizer pipeline, and adds config/session properties and tests so semi-joins on the probe side can be pushed through unions (optionally through an intermediate project) under a gated feature flag. Sequence diagram for applying PushSemiJoinThroughUnion rulesequenceDiagram
participant Session
participant IterativeOptimizer
participant PushSemiJoinThroughUnion
participant Context
participant Lookup
participant SemiJoinNode
participant UnionNode
participant ProjectNode
participant VariableAllocator
participant PlanNodeIdAllocator
Session->>IterativeOptimizer: optimize(plan, session)
IterativeOptimizer->>PushSemiJoinThroughUnion: isEnabled(session)
PushSemiJoinThroughUnion->>Session: getSystemProperty(PUSH_SEMI_JOIN_THROUGH_UNION, Boolean)
Session-->>PushSemiJoinThroughUnion: Boolean
PushSemiJoinThroughUnion-->>IterativeOptimizer: enabled?
IterativeOptimizer->>PushSemiJoinThroughUnion: apply(semiJoinNode, captures, context)
PushSemiJoinThroughUnion->>Context: getLookup()
Context-->>PushSemiJoinThroughUnion: Lookup
PushSemiJoinThroughUnion->>Lookup: resolve(semiJoinNode.getSource())
Lookup-->>PushSemiJoinThroughUnion: sourcePlanNode
alt source is UnionNode
PushSemiJoinThroughUnion->>PushSemiJoinThroughUnion: pushThroughUnion(semiJoinNode, unionNode, empty, context)
else source is ProjectNode over UnionNode
PushSemiJoinThroughUnion->>PushSemiJoinThroughUnion: pushThroughUnion(semiJoinNode, unionNode, projectNode, context)
end
loop for each union branch
PushSemiJoinThroughUnion->>Context: getVariableAllocator()
Context-->>PushSemiJoinThroughUnion: VariableAllocator
PushSemiJoinThroughUnion->>VariableAllocator: newVariable(expression or template)
VariableAllocator-->>PushSemiJoinThroughUnion: VariableReferenceExpression
PushSemiJoinThroughUnion->>Context: getIdAllocator()
Context-->>PushSemiJoinThroughUnion: PlanNodeIdAllocator
PushSemiJoinThroughUnion->>PlanNodeIdAllocator: getNextId()
PlanNodeIdAllocator-->>PushSemiJoinThroughUnion: PlanNodeId
PushSemiJoinThroughUnion->>PushSemiJoinThroughUnion: build branch SemiJoinNode
end
PushSemiJoinThroughUnion->>PlanNodeIdAllocator: getNextId()
PlanNodeIdAllocator-->>PushSemiJoinThroughUnion: PlanNodeId
PushSemiJoinThroughUnion->>PushSemiJoinThroughUnion: build new UnionNode with per-branch SemiJoinNode
PushSemiJoinThroughUnion-->>IterativeOptimizer: Result.ofPlanNode(newUnionNode)
IterativeOptimizer-->>Session: optimized plan
Class diagram for PushSemiJoinThroughUnion rule and related plan nodesclassDiagram
class PushSemiJoinThroughUnion {
+Pattern getPattern()
+boolean isEnabled(Session session)
+Result apply(SemiJoinNode semiJoinNode, Captures captures, Context context)
-Result pushThroughUnion(SemiJoinNode semiJoinNode, UnionNode unionNode, Optional projectNode, Context context)
}
class Rule_SemiJoinNode {
<<interface>>
+Pattern getPattern()
+boolean isEnabled(Session session)
+Result apply(SemiJoinNode semiJoinNode, Captures captures, Context context)
}
class SemiJoinNode {
+PlanNode getSource()
+PlanNode getFilteringSource()
+VariableReferenceExpression getSourceJoinVariable()
+VariableReferenceExpression getFilteringSourceJoinVariable()
+VariableReferenceExpression getSemiJoinOutput()
+Optional getSourceHashVariable()
+Optional getFilteringSourceHashVariable()
+List getOutputVariables()
+Optional getDistributionType()
}
class UnionNode {
+List getSources()
+List getOutputVariables()
+Map sourceVariableMap(int index)
}
class ProjectNode {
+PlanNode getSource()
+Assignments getAssignments()
+Object getLocality()
}
class Context {
+Lookup getLookup()
+PlanNodeIdAllocator getIdAllocator()
+VariableAllocator getVariableAllocator()
}
class Lookup {
+PlanNode resolve(PlanNode node)
}
class VariableAllocator {
+VariableReferenceExpression newVariable(RowExpression expression)
+VariableReferenceExpression newVariable(VariableReferenceExpression template)
}
class PlanNodeIdAllocator {
+PlanNodeId getNextId()
}
class RowExpressionVariableInliner {
+static RowExpression inlineVariables(Map mapping, RowExpression expression)
}
PushSemiJoinThroughUnion ..|> Rule_SemiJoinNode
PushSemiJoinThroughUnion --> SemiJoinNode
PushSemiJoinThroughUnion --> UnionNode
PushSemiJoinThroughUnion --> ProjectNode
PushSemiJoinThroughUnion --> Context
Context --> Lookup
Context --> VariableAllocator
Context --> PlanNodeIdAllocator
PushSemiJoinThroughUnion ..> RowExpressionVariableInliner
UnionNode --> PlanNode
ProjectNode --> PlanNode
SemiJoinNode --> PlanNode
Class diagram for config and session properties of pushSemiJoinThroughUnionclassDiagram
class FeaturesConfig {
-boolean pushSemiJoinThroughUnion
+boolean isPushSemiJoinThroughUnion()
+FeaturesConfig setPushSemiJoinThroughUnion(boolean pushSemiJoinThroughUnion)
-boolean pushAggregationThroughJoin
+boolean isPushAggregationThroughJoin()
+FeaturesConfig setPushAggregationThroughJoin(boolean value)
}
class SystemSessionProperties {
<<utility>>
+String PUSH_SEMI_JOIN_THROUGH_UNION
+SystemSessionProperties(FeaturesConfig featuresConfig, Map properties)
+static boolean isPushSemiJoinThroughUnion(Session session)
+static boolean shouldPushAggregationThroughJoin(Session session)
}
class Session {
+Object getSystemProperty(String key, Class type)
}
FeaturesConfig <.. SystemSessionProperties : uses
SystemSessionProperties ..> Session : reads property
Flow diagram for configuration and use of push_semi_join_through_unionflowchart TD
A["Configure optimizer.push-semi-join-through-union in FeaturesConfig"] --> B["Construct SystemSessionProperties with FeaturesConfig"]
B --> C["Register session property push_semi_join_through_union
with default from FeaturesConfig.isPushSemiJoinThroughUnion"]
C --> D["Create Session with value for push_semi_join_through_union"]
D --> E["Build PlanOptimizers pipeline"]
E --> F["Add IterativeOptimizer containing PushSemiJoinThroughUnion rule"]
F --> G["IterativeOptimizer checks rule.isEnabled(session)
using SystemSessionProperties.isPushSemiJoinThroughUnion"]
G --> H{"push_semi_join_through_union is true?"}
H -- "no" --> I["Rule disabled, no SemiJoin push through Union"]
H -- "yes" --> J["Apply PushSemiJoinThroughUnion to SemiJoinNode over Union or Project over Union"]
J --> K["Produce new Union of per-branch SemiJoinNode plan"]
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Hey - I've found 2 issues, and left some high level feedback:
- When building the per-branch SemiJoinNode instances in PushSemiJoinThroughUnion, the dynamic filter mappings from the original semiJoinNode are dropped (last constructor argument is ImmutableMap.of()), which will silently disable any dynamic filtering on these joins; consider propagating semiJoinNode.getDynamicFilters() instead.
- In pushThroughUnion, the rule assumes that the union/project mappings always contain entries for semiJoinNode.getSourceJoinVariable() and any source hash variable; it would be safer to validate these are present and bail out (Result.empty()) if a mapping is missing to avoid potential NPEs or incorrect plans when the shape slightly differs from the expected patterns.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- When building the per-branch SemiJoinNode instances in PushSemiJoinThroughUnion, the dynamic filter mappings from the original semiJoinNode are dropped (last constructor argument is ImmutableMap.of()), which will silently disable any dynamic filtering on these joins; consider propagating semiJoinNode.getDynamicFilters() instead.
- In pushThroughUnion, the rule assumes that the union/project mappings always contain entries for semiJoinNode.getSourceJoinVariable() and any source hash variable; it would be safer to validate these are present and bail out (Result.empty()) if a mapping is missing to avoid potential NPEs or incorrect plans when the shape slightly differs from the expected patterns.
## Individual Comments
### Comment 1
<location> `presto-main-base/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PushSemiJoinThroughUnion.java:191-192` </location>
<code_context>
+ newSemiJoinOutput,
+ mappedSourceHashVar,
+ semiJoinNode.getFilteringSourceHashVariable(),
+ semiJoinNode.getDistributionType(),
+ ImmutableMap.of());
+
+ newSources.add(newSemiJoin);
</code_context>
<issue_to_address>
**issue (bug_risk):** Existing semi-join dynamic filters are being dropped when creating branch semi-joins.
These `SemiJoinNode` instances are created with `ImmutableMap.of()` for dynamic filters, so any dynamic filters on the original `semiJoinNode` are dropped. If the original semi-join defines dynamic filters, this will alter query behavior and potentially degrade performance. Please propagate `semiJoinNode.getDynamicFilters()` (or the equivalent) so each branch semi-join retains the original dynamic filters.
</issue_to_address>
### Comment 2
<location> `presto-main-base/src/test/java/com/facebook/presto/sql/planner/iterative/rule/TestPushSemiJoinThroughUnion.java:64` </location>
<code_context>
+ }
+
+ @Test
+ public void testPushThroughTwoBranchUnion()
+ {
+ tester().assertThat(new PushSemiJoinThroughUnion())
</code_context>
<issue_to_address>
**suggestion (testing):** Add a test case covering semi join hash variables being correctly remapped through the union (and project) branches
In `PushSemiJoinThroughUnion#pushThroughUnion`, both `sourceHashVariable` and `filteringSourceHashVariable` are remapped per branch, but none of the tests exercise semi joins with hash variables set. Please add at least one test (for union, and ideally project-over-union) where the `SemiJoinNode` has non-empty hash variables, and use the plan matcher to assert that the per-branch semi joins preserve and correctly remap these hash symbols.
Suggested implementation:
```java
.setSystemProperty(PUSH_SEMI_JOIN_THROUGH_UNION, "true")
.on(p -> {
VariableReferenceExpression sourceJoinVar = p.variable("sourceJoinVar");
VariableReferenceExpression filterJoinVar = p.variable("filterJoinVar");
VariableReferenceExpression semiJoinOutput = p.variable("semiJoinOutput", BOOLEAN);
VariableReferenceExpression sourceHashVar = p.variable("sourceHashVar", BIGINT);
VariableReferenceExpression filterHashVar = p.variable("filterHashVar", BIGINT);
return p.semiJoin(
sourceJoinVar,
filterJoinVar,
semiJoinOutput,
Optional.of(sourceHashVar),
```
To fully implement the review comment, you should also:
1. **Complete the semi join construction with non-empty filtering hash**
Wherever this `p.semiJoin(...)` call currently continues with a second `Optional.empty()` (for `filteringSourceHashVariable`), replace it with:
```java
Optional.of(filterHashVar),
```
so the `SemiJoinNode` has both hash variables set.
2. **Add a matcher for the union case ensuring hash symbols are preserved and remapped**
Extend this test (or create a dedicated `testPushThroughTwoBranchUnionWithHashes`) to use the plan matcher, something along the lines of:
```java
.matches(
semiJoin(
"sourceJoinVar",
"filterJoinVar",
"semiJoinOutput",
Optional.of("sourceHashVar"),
Optional.of("filterHashVar"),
anyTree(
union(
// each branch should expose branch-specific hash symbols
project(
ImmutableMap.of(
"sourceJoinVar", expression("..."),
"sourceHashVar", expression("...")),
values("...")),
project(
ImmutableMap.of(
"sourceJoinVar", expression("..."),
"sourceHashVar", expression("...")),
values("...")))),
anyTree(
union(
project(
ImmutableMap.of(
"filterJoinVar", expression("..."),
"filterHashVar", expression("...")),
values("...")),
project(
ImmutableMap.of(
"filterJoinVar", expression("..."),
"filterHashVar", expression("...")),
values("..."))))))
);
```
Adjust symbol names and expressions to match how the test builds the branches. The key is:
* The top-level `SemiJoinNode` has `sourceHashVar` / `filterHashVar` set.
* Each union branch has its own projected hash symbol which is mapped to the top-level hash symbol by the union.
3. **Add a separate test for project-over-union with hash variables**
Add a new test method (e.g. `testPushThroughProjectOverUnionWithHashVariables`) that:
* Builds a plan where `SemiJoinNode` is above a `Project` over a `Union`.
* Sets `sourceHashVariable` and `filteringSourceHashVariable` on the `SemiJoinNode`.
* Uses the matcher to assert that, after the rule fires, the semi joins pushed into each branch still have appropriate per-branch hash symbols, and the final union/project correctly remaps them back to the top-level hash symbol.
Because only a fragment of the file is visible, you'll need to:
* Import `BIGINT` if it's not already imported in this test class.
* Import any additional plan matcher utilities used in the new `.matches(...)` assertions (e.g., `semiJoin`, `union`, `project`, `anyTree`, `values`).
* Mirror the existing style and helper usage in this test class when defining the new tests and matchers.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
...e/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PushSemiJoinThroughUnion.java
Outdated
Show resolved
Hide resolved
| } | ||
|
|
||
| @Test | ||
| public void testPushThroughTwoBranchUnion() |
There was a problem hiding this comment.
suggestion (testing): Add a test case covering semi join hash variables being correctly remapped through the union (and project) branches
In PushSemiJoinThroughUnion#pushThroughUnion, both sourceHashVariable and filteringSourceHashVariable are remapped per branch, but none of the tests exercise semi joins with hash variables set. Please add at least one test (for union, and ideally project-over-union) where the SemiJoinNode has non-empty hash variables, and use the plan matcher to assert that the per-branch semi joins preserve and correctly remap these hash symbols.
Suggested implementation:
.setSystemProperty(PUSH_SEMI_JOIN_THROUGH_UNION, "true")
.on(p -> {
VariableReferenceExpression sourceJoinVar = p.variable("sourceJoinVar");
VariableReferenceExpression filterJoinVar = p.variable("filterJoinVar");
VariableReferenceExpression semiJoinOutput = p.variable("semiJoinOutput", BOOLEAN);
VariableReferenceExpression sourceHashVar = p.variable("sourceHashVar", BIGINT);
VariableReferenceExpression filterHashVar = p.variable("filterHashVar", BIGINT);
return p.semiJoin(
sourceJoinVar,
filterJoinVar,
semiJoinOutput,
Optional.of(sourceHashVar),To fully implement the review comment, you should also:
-
Complete the semi join construction with non-empty filtering hash
Wherever thisp.semiJoin(...)call currently continues with a secondOptional.empty()(forfilteringSourceHashVariable), replace it with:Optional.of(filterHashVar),
so the
SemiJoinNodehas both hash variables set. -
Add a matcher for the union case ensuring hash symbols are preserved and remapped
Extend this test (or create a dedicatedtestPushThroughTwoBranchUnionWithHashes) to use the plan matcher, something along the lines of:.matches( semiJoin( "sourceJoinVar", "filterJoinVar", "semiJoinOutput", Optional.of("sourceHashVar"), Optional.of("filterHashVar"), anyTree( union( // each branch should expose branch-specific hash symbols project( ImmutableMap.of( "sourceJoinVar", expression("..."), "sourceHashVar", expression("...")), values("...")), project( ImmutableMap.of( "sourceJoinVar", expression("..."), "sourceHashVar", expression("...")), values("...")))), anyTree( union( project( ImmutableMap.of( "filterJoinVar", expression("..."), "filterHashVar", expression("...")), values("...")), project( ImmutableMap.of( "filterJoinVar", expression("..."), "filterHashVar", expression("...")), values("...")))))) );
Adjust symbol names and expressions to match how the test builds the branches. The key is:
- The top-level
SemiJoinNodehassourceHashVar/filterHashVarset. - Each union branch has its own projected hash symbol which is mapped to the top-level hash symbol by the union.
- The top-level
-
Add a separate test for project-over-union with hash variables
Add a new test method (e.g.testPushThroughProjectOverUnionWithHashVariables) that:- Builds a plan where
SemiJoinNodeis above aProjectover aUnion. - Sets
sourceHashVariableandfilteringSourceHashVariableon theSemiJoinNode. - Uses the matcher to assert that, after the rule fires, the semi joins pushed into each branch still have appropriate per-branch hash symbols, and the final union/project correctly remaps them back to the top-level hash symbol.
- Builds a plan where
Because only a fragment of the file is visible, you'll need to:
- Import
BIGINTif it's not already imported in this test class. - Import any additional plan matcher utilities used in the new
.matches(...)assertions (e.g.,semiJoin,union,project,anyTree,values). - Mirror the existing style and helper usage in this test class when defining the new tests and matchers.
Push SemiJoinNode through UnionNode on the probe/source side, creating per-branch semi joins that share the filtering source. Also handles the case where a ProjectNode sits between the SemiJoin and Union by pushing the project into each union branch before creating the per-branch semi joins. Gated behind push_semi_join_through_union session property (default false).
d183b31 to
a243e1a
Compare
|
Hi @kaikalur, thanks for this PR! As part of the release process — do you think this change warrants a release note? If so, would you like to add one? Happy to help if you'd prefer. |
Summary
PushSemiJoinThroughUnionthat pushes aSemiJoinNodethrough aUnionNodeon the probe/source side, creating per-branch semi joins that share the filtering sourceProjectNodesits between the SemiJoin and Union (SemiJoin -> Project -> Union) by pushing the project into each union branch before creating per-branch semi joinspush_semi_join_through_unionsession property (defaultfalse) /optimizer.push-semi-join-through-unionconfig propertyTransformation
becomes:
With a project between SemiJoin and Union:
becomes:
Test plan
TestPushSemiJoinThroughUnion— 5 test cases covering:TestFeaturesConfig— verifies default/non-default config mappingmvn test -pl presto-main-base -Dtest="TestPushSemiJoinThroughUnion,TestFeaturesConfig"Summary by Sourcery
Introduce an optimizer rule that pushes semi joins through unions and gate it behind configurable feature flags.
New Features:
Enhancements:
Tests: