diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveDistributedJoinQueriesWithDynamicFiltering.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveDistributedJoinQueriesWithDynamicFiltering.java index 5edfabd5d54fb..493e81b3cd18f 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveDistributedJoinQueriesWithDynamicFiltering.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveDistributedJoinQueriesWithDynamicFiltering.java @@ -34,6 +34,7 @@ import static com.facebook.airlift.testing.Assertions.assertLessThanOrEqual; import static com.facebook.presto.SystemSessionProperties.ENABLE_DYNAMIC_FILTERING; import static com.facebook.presto.SystemSessionProperties.JOIN_DISTRIBUTION_TYPE; +import static com.facebook.presto.SystemSessionProperties.JOIN_REORDERING_STRATEGY; import static com.facebook.presto.hive.HiveQueryRunner.createQueryRunner; import static com.facebook.presto.sql.analyzer.FeaturesConfig.JoinDistributionType.BROADCAST; import static io.airlift.tpch.TpchTable.getTables; @@ -90,6 +91,25 @@ public void testJoinWithSelectiveBuildSide() assertLessThanOrEqual(probeStats.getInputPositions(), countRows("lineitem")); } + @Test + public void testJoinDynamicFilteringMultiJoin() + { + assertUpdate("CREATE TABLE t0 (k0 integer, v0 real)"); + assertUpdate("CREATE TABLE t1 (k1 integer, v1 real)"); + assertUpdate("CREATE TABLE t2 (k2 integer, v2 real)"); + assertUpdate("INSERT INTO t0 VALUES (1, 1.0)", 1); + assertUpdate("INSERT INTO t1 VALUES (1, 2.0)", 1); + assertUpdate("INSERT INTO t2 VALUES (1, 3.0)", 1); + + String query = "SELECT k0, k1, k2 FROM t0, t1, t2 WHERE (k0 = k1) AND (k0 = k2) AND (v0 + v1 = v2)"; + Session session = Session.builder(getSession()) + .setSystemProperty(ENABLE_DYNAMIC_FILTERING, "true") + .setSystemProperty(JOIN_DISTRIBUTION_TYPE, FeaturesConfig.JoinDistributionType.BROADCAST.name()) + .setSystemProperty(JOIN_REORDERING_STRATEGY, FeaturesConfig.JoinReorderingStrategy.NONE.name()) + .build(); + assertQuery(session, query, "SELECT 1, 1, 1"); + } + private OperatorStats searchScanFilterAndProjectOperatorStats(QueryId queryId, String tableName) { DistributedQueryRunner runner = (DistributedQueryRunner) getQueryRunner(); diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/RemoveUnsupportedDynamicFilters.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/RemoveUnsupportedDynamicFilters.java index 5fdc57ec31cf8..12a141f408508 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/RemoveUnsupportedDynamicFilters.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/RemoveUnsupportedDynamicFilters.java @@ -44,6 +44,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; @@ -123,6 +124,9 @@ public PlanWithConsumedDynamicFilters visitJoin(JoinNode node, Set allow { JoinDynamicFilterResult joinDynamicFilterResult = extractDynamicFilterFromJoin(node, allowedDynamicFilterIds); if (!joinDynamicFilterResult.getProbe().equals(node.getLeft()) || !joinDynamicFilterResult.getBuild().equals(node.getRight()) || !joinDynamicFilterResult.getDynamicFilters().equals(node.getDynamicFilters())) { + Optional filter = node + .getFilter().map(this::removeAllDynamicFilters) // dynamic filtering is not supported for LookupJoinOperators + .filter(expression -> !expression.equals(TRUE_CONSTANT)); return new PlanWithConsumedDynamicFilters( new JoinNode( node.getId(), @@ -131,7 +135,7 @@ public PlanWithConsumedDynamicFilters visitJoin(JoinNode node, Set allow joinDynamicFilterResult.getBuild(), node.getCriteria(), node.getOutputVariables(), - node.getFilter(), + filter, node.getLeftHashVariable(), node.getRightHashVariable(), node.getDistributionType(), diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/DynamicFiltersChecker.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/DynamicFiltersChecker.java index 4e65ac1ca5c81..101fa1044a64c 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/DynamicFiltersChecker.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/DynamicFiltersChecker.java @@ -29,6 +29,7 @@ import com.facebook.presto.sql.planner.plan.OutputNode; import com.facebook.presto.sql.planner.plan.SemiJoinNode; import com.facebook.presto.sql.relational.Expressions; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import java.util.HashSet; @@ -73,6 +74,13 @@ public Set visitOutput(OutputNode node, Void context) @Override public Set visitJoin(JoinNode node, Void context) { + List nonPushedDownFilters = node + .getFilter() + .map(DynamicFilters::extractDynamicFilters) + .map(DynamicFilters.DynamicFilterExtractResult::getDynamicConjuncts) + .orElse(ImmutableList.of()); + verify(nonPushedDownFilters.isEmpty(), "Dynamic filters %s present in join's filter predicate were not pushed down.", nonPushedDownFilters); + return extractUnmatchedDynamicFilters(node, context); } @@ -86,14 +94,12 @@ private Set extractUnmatchedDynamicFilters(AbstractJoinNode node, Void c { Set currentJoinDynamicFilters = node.getDynamicFilters().keySet(); Set consumedProbeSide = node.getProbe().accept(this, context); - verify( - difference(currentJoinDynamicFilters, consumedProbeSide).isEmpty(), - "Dynamic filters present in join were not fully consumed by its probe side."); + Set unconsumedByProbeSide = difference(currentJoinDynamicFilters, consumedProbeSide); + verify(unconsumedByProbeSide.isEmpty(), "Dynamic filters %s present in join were not fully consumed by its probe side.", unconsumedByProbeSide); Set consumedBuildSide = node.getBuild().accept(this, context); - verify( - intersection(currentJoinDynamicFilters, consumedBuildSide).isEmpty(), - "Dynamic filters present in join were consumed by its build side."); + Set unconsumedByBuildSide = intersection(currentJoinDynamicFilters, consumedBuildSide); + verify(unconsumedByBuildSide.isEmpty(), "Dynamic filters %s present in join were consumed by its build side.", unconsumedByBuildSide); Set unmatched = new HashSet<>(consumedBuildSide); unmatched.addAll(consumedProbeSide); diff --git a/presto-main/src/test/java/com/facebook/presto/sql/planner/TestDynamicFilter.java b/presto-main/src/test/java/com/facebook/presto/sql/planner/TestDynamicFilter.java index 2638ea8b09455..bebfaeca494ef 100644 --- a/presto-main/src/test/java/com/facebook/presto/sql/planner/TestDynamicFilter.java +++ b/presto-main/src/test/java/com/facebook/presto/sql/planner/TestDynamicFilter.java @@ -29,6 +29,7 @@ import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.anyTree; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.equiJoinClause; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.exchange; +import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.expression; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.filter; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.join; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.node; @@ -373,4 +374,35 @@ public void testPredicateFromSourceSideNotPropagatesToFilterSideOfSemiJoinIfNotI project( tableScan("orders", ImmutableMap.of("ORDERS_ORDER_KEY", "orderkey"))))))); } + + @Test + public void testNonPushedDownJoinFilterRemoval() + { + assertPlan( + "SELECT 1 FROM part t0, part t1, part t2 " + + "WHERE t0.partkey = t1.partkey AND t0.partkey = t2.partkey " + + "AND t0.size + t1.size = t2.size", + anyTree( + join( + INNER, + ImmutableList.of(equiJoinClause("K0", "K2"), equiJoinClause("S", "V2")), + project( + project( + ImmutableMap.of("S", expression("V0 + V1")), + join( + INNER, + ImmutableList.of(equiJoinClause("K0", "K1")), + project( + node( + FilterNode.class, + tableScan("part", ImmutableMap.of("K0", "partkey", "V0", "size")))), + exchange( + project( + node( + FilterNode.class, + tableScan("part", ImmutableMap.of("K1", "partkey", "V1", "size")))))))), + exchange( + project( + tableScan("part", ImmutableMap.of("K2", "partkey", "V2", "size"))))))); + } } diff --git a/presto-main/src/test/java/com/facebook/presto/sql/planner/sanity/TestDynamicFiltersChecker.java b/presto-main/src/test/java/com/facebook/presto/sql/planner/sanity/TestDynamicFiltersChecker.java index e4f85bd22cbda..d9b1dd6c4f67e 100644 --- a/presto-main/src/test/java/com/facebook/presto/sql/planner/sanity/TestDynamicFiltersChecker.java +++ b/presto-main/src/test/java/com/facebook/presto/sql/planner/sanity/TestDynamicFiltersChecker.java @@ -82,7 +82,7 @@ public void setup() ordersTableScanNode = builder.tableScan(ordersTableHandle, ImmutableList.of(ordersOrderKeyVariable), ImmutableMap.of(ordersOrderKeyVariable, new TpchColumnHandle("orderkey", BIGINT))); } - @Test(expectedExceptions = VerifyException.class, expectedExceptionsMessageRegExp = "Dynamic filters present in join were not fully consumed by its probe side.") + @Test(expectedExceptions = VerifyException.class, expectedExceptionsMessageRegExp = "Dynamic filters \\[DF\\] present in join were not fully consumed by its probe side.") public void testUnconsumedDynamicFilterInJoin() { PlanNode root = builder.join( @@ -98,7 +98,7 @@ public void testUnconsumedDynamicFilterInJoin() validatePlan(root); } - @Test(expectedExceptions = VerifyException.class, expectedExceptionsMessageRegExp = "Dynamic filters present in join were consumed by its build side.") + @Test(expectedExceptions = VerifyException.class, expectedExceptionsMessageRegExp = "Dynamic filters \\[DF\\] present in join were consumed by its build side.") public void testDynamicFilterConsumedOnBuildSide() { PlanNode root = builder.join(