Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -144,14 +144,14 @@ public PlanNode optimize(PlanNode plan, Session session, TypeProvider types, Pla
TRUE_CONSTANT);
}

public static RowExpression createDynamicFilterExpression(String id, VariableReferenceExpression input, FunctionAndTypeManager functionAndTypeManager)
public static RowExpression createDynamicFilterExpression(String id, RowExpression input, FunctionAndTypeManager functionAndTypeManager)
{
return createDynamicFilterExpression(id, input, functionAndTypeManager, EQUAL.name());
}

private static RowExpression createDynamicFilterExpression(
String id,
VariableReferenceExpression input,
RowExpression input,
FunctionAndTypeManager functionAndTypeManager,
String operator)
{
Expand Down Expand Up @@ -628,7 +628,7 @@ private static DynamicFiltersResult createDynamicFilters(
{
Map<String, VariableReferenceExpression> dynamicFilters = ImmutableMap.of();
List<RowExpression> predicates = ImmutableList.of();
if (node.getType() == INNER) {
if (node.getType() == INNER || node.getType() == RIGHT) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not left join? Are you assuming right side is going to be the build?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, dynamic filtering applies to broadcast join, and the right side is the build.
since dynamic filtering is generating predicates and propagate to the probe side, right join is fine, but left join might not be correct.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like you are assuming that RIGHT join will never be rewritten to LEFT due to some other optimizations? I'm just wondering if this is useful. Please add a real usecase and also more comprehensive testing (including hive end to end) and also a session param to control this feature (with defualt off) if we are convinced this is a good thing to do.

Copy link
Copy Markdown
Collaborator Author

@zhenxiao zhenxiao Jan 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think RIGHT will be rewritten to LEFT when it comes here, since:

  1. dynamic filtering only applies to broadcast join, we already have session properties to turn it on/off
  2. the code here is a branch assuming dynamic filtering is on
  3. cost based optimizer is not used

SystemSessionProperties.isEnableDynamicFiltering is the session property to control dynamic filtering
The use case is:
SELECT o.orderkey FROM orders o RIGHT JOIN lineitem l ON l.orderkey + 1 = o.orderkey
The motivation to work on this feature it, when I was implementing comparison operator support for dynamic filtering, was trying to start from simple, so assuming left and right to be variable, and only for inner join. Actually, find support right join and expression for probe side is just a few lines changes, so submitted this PR.

List<CallExpression> clauses = getDynamicFilterClauses(node, equiJoinClauses, joinFilter, functionAndTypeManager);
List<VariableReferenceExpression> buildSymbols = clauses.stream()
.map(expression -> (VariableReferenceExpression) expression.getArguments().get(1))
Expand All @@ -641,10 +641,10 @@ private static DynamicFiltersResult createDynamicFilters(

ImmutableList.Builder<RowExpression> predicatesBuilder = ImmutableList.builder();
for (CallExpression expression : clauses) {
VariableReferenceExpression probeSymbol = (VariableReferenceExpression) expression.getArguments().get(0);
RowExpression probeExpression = expression.getArguments().get(0);
VariableReferenceExpression buildSymbol = (VariableReferenceExpression) expression.getArguments().get(1);
String id = buildSymbolToIdMap.get(buildSymbol);
RowExpression predicate = createDynamicFilterExpression(id, probeSymbol, functionAndTypeManager, expression.getDisplayName());
RowExpression predicate = createDynamicFilterExpression(id, probeExpression, functionAndTypeManager, expression.getDisplayName());
predicatesBuilder.add(predicate);
}
dynamicFilters = buildSymbolToIdMap.inverse();
Expand Down Expand Up @@ -732,24 +732,37 @@ private static Optional<CallExpression> getDynamicFilterComparison(
CallExpression call,
FunctionAndTypeManager functionAndTypeManager)
{
String function = call.getDisplayName();
Optional<OperatorType> operatorType = functionAndTypeManager.getFunctionMetadata(call.getFunctionHandle()).getOperatorType();
if (!operatorType.isPresent()) {
return Optional.empty();
}
OperatorType operator = operatorType.get();
List<RowExpression> arguments = call.getArguments();
RowExpression left = arguments.get(0);
RowExpression right = arguments.get(1);
boolean shouldFlip = false;
if (!(left instanceof VariableReferenceExpression && right instanceof VariableReferenceExpression)) {

// supported comparison for dynamic filtering: EQUAL, LESS_THAN, LESS_THAN_OR_EQUAL, GREATER_THAN, GREATER_THAN_OR_EQUAL
if (!operator.isComparisonOperator()) {
return Optional.empty();
}

OperatorType operator = OperatorType.valueOf(function);
if (!operator.isComparisonOperator() || operator == NOT_EQUAL || operator == IS_DISTINCT_FROM) {
if (operator == NOT_EQUAL || operator == IS_DISTINCT_FROM) {
return Optional.empty();
}

if (node.getRight().getOutputVariables().contains(left)) {
shouldFlip = true;
// supported expression for dynamic filtering:
// either 1. left child contains left variables and right child contains right variables
// or, 2. left child contains right variables and right child contains left variables
Set<VariableReferenceExpression> leftUniqueOutputs = VariablesExtractor.extractUnique(left);
Set<VariableReferenceExpression> rightUniqueOutputs = VariablesExtractor.extractUnique(right);
boolean leftChildContainsLeftVariables = node.getLeft().getOutputVariables().containsAll(leftUniqueOutputs);
boolean rightChildContainsRightVariables = node.getRight().getOutputVariables().containsAll(rightUniqueOutputs);
boolean leftChildContainsRightVariables = node.getLeft().getOutputVariables().containsAll(rightUniqueOutputs);
boolean rightChildContainsLeftVariables = node.getRight().getOutputVariables().containsAll(leftUniqueOutputs);
if (!((leftChildContainsLeftVariables && rightChildContainsRightVariables) || (leftChildContainsRightVariables && rightChildContainsLeftVariables))) {
return Optional.empty();
}
if (node.getLeft().getOutputVariables().contains(right)) {

boolean shouldFlip = false;
if (leftChildContainsRightVariables && rightChildContainsLeftVariables) {
shouldFlip = true;
}

Expand All @@ -759,6 +772,9 @@ private static Optional<CallExpression> getDynamicFilterComparison(
right = arguments.get(0);
}

if (!(right instanceof VariableReferenceExpression)) {
return Optional.empty();
}
return Optional.of(call(
operator.name(),
functionAndTypeManager.resolveOperator(operator, fromTypes(left.getType(), right.getType())),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.tableScan;
import static com.facebook.presto.sql.planner.plan.JoinNode.Type.INNER;
import static com.facebook.presto.sql.planner.plan.JoinNode.Type.LEFT;
import static com.facebook.presto.sql.planner.plan.JoinNode.Type.RIGHT;

public class TestDynamicFilter
extends BasePlanTest
Expand All @@ -61,6 +62,37 @@ public void testNonInnerJoin()
exchange(project(tableScan("lineitem", ImmutableMap.of("LINEITEM_OK", "orderkey")))))));
}

@Test
public void testRightEquiJoin()
{
assertPlan("SELECT o.orderkey FROM orders o RIGHT JOIN lineitem l ON l.orderkey = o.orderkey",
anyTree(
join(
RIGHT,
ImmutableList.of(equiJoinClause("ORDERS_OK", "LINEITEM_OK")),
anyTree(
tableScan("orders", ImmutableMap.of("ORDERS_OK", "orderkey"))),
exchange(
project(
tableScan("lineitem", ImmutableMap.of("LINEITEM_OK", "orderkey")))))));
}

@Test
public void testRightEquiJoinWithLeftExpression()
{
assertPlan("SELECT o.orderkey FROM orders o RIGHT JOIN lineitem l ON l.orderkey + 1 = o.orderkey",
anyTree(
join(
RIGHT,
ImmutableList.of(equiJoinClause("ORDERS_OK", "expr")),
anyTree(
tableScan("orders", ImmutableMap.of("ORDERS_OK", "orderkey"))),
anyTree(
project(
ImmutableMap.of("expr", expression("LINEITEM_OK + BIGINT '1'")),
tableScan("lineitem", ImmutableMap.of("LINEITEM_OK", "orderkey")))))));
}

@Test
public void testEmptyJoinCriteria()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -912,6 +912,15 @@ public void testJoinWithInSubqueryToBeExecutedAsPostJoinFilter()
}
}

@Test
public void testOuterJoinWithExpression()
{
assertQuery("SELECT o.orderkey FROM orders o RIGHT JOIN lineitem l ON l.orderkey * 2 + 1 = o.orderkey");
assertQuery("SELECT o.orderkey FROM orders o RIGHT JOIN lineitem l ON l.orderkey * 5 - o.orderkey * 10 = 1");
assertQuery("SELECT o.orderkey FROM orders o LEFT JOIN lineitem l ON l.orderkey * 2 + 1 = o.orderkey");
assertQuery("SELECT o.orderkey FROM orders o LEFT JOIN lineitem l ON l.orderkey * 5 - o.orderkey * 10 = 1");
}

@Test
public void testOuterJoinWithComplexCorrelatedSubquery()
{
Expand Down