diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestMergeJoinPlan.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestMergeJoinPlan.java index dc1a093fe1cdd..4bd71b46d8177 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestMergeJoinPlan.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestMergeJoinPlan.java @@ -15,6 +15,7 @@ import com.facebook.presto.Session; import com.facebook.presto.spi.plan.JoinType; +import com.facebook.presto.sql.analyzer.FeaturesConfig; import com.facebook.presto.sql.planner.assertions.PlanMatchPattern; import com.facebook.presto.testing.QueryRunner; import com.facebook.presto.tests.AbstractTestQueryFramework; @@ -56,6 +57,12 @@ protected QueryRunner createQueryRunner() Optional.empty()); } + @Override + protected FeaturesConfig createFeaturesConfig() + { + return new FeaturesConfig().setNativeExecutionEnabled(true); + } + @Test public void testJoinType() { @@ -83,19 +90,19 @@ public void testJoinType() assertPlan( mergeJoinEnabled(), "select * from test_join_customer_join_type left join test_join_order_join_type on test_join_customer_join_type.custkey = test_join_order_join_type.custkey", - joinPlan("test_join_customer_join_type", "test_join_order_join_type", ImmutableList.of("custkey"), ImmutableList.of("custkey"), LEFT, false)); + joinPlan("test_join_customer_join_type", "test_join_order_join_type", ImmutableList.of("custkey"), ImmutableList.of("custkey"), LEFT, true)); // Right join assertPlan( mergeJoinEnabled(), "select * from test_join_customer_join_type right join test_join_order_join_type on test_join_customer_join_type.custkey = test_join_order_join_type.custkey", - joinPlan("test_join_customer_join_type", "test_join_order_join_type", ImmutableList.of("custkey"), ImmutableList.of("custkey"), RIGHT, false)); + joinPlan("test_join_customer_join_type", "test_join_order_join_type", ImmutableList.of("custkey"), ImmutableList.of("custkey"), RIGHT, true)); // Outer join assertPlan( mergeJoinEnabled(), "select * from test_join_customer_join_type full join test_join_order_join_type on test_join_customer_join_type.custkey = test_join_order_join_type.custkey", - joinPlan("test_join_customer_join_type", "test_join_order_join_type", ImmutableList.of("custkey"), ImmutableList.of("custkey"), FULL, false)); + joinPlan("test_join_customer_join_type", "test_join_order_join_type", ImmutableList.of("custkey"), ImmutableList.of("custkey"), FULL, true)); } finally { queryRunner.execute("DROP TABLE IF EXISTS test_join_customer_join_type"); diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestMergeJoinPlanPrestoOnSpark.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestMergeJoinPlanPrestoOnSpark.java new file mode 100644 index 0000000000000..5f5be3c3c9c85 --- /dev/null +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestMergeJoinPlanPrestoOnSpark.java @@ -0,0 +1,423 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.hive; + +import com.facebook.presto.Session; +import com.facebook.presto.spi.plan.JoinType; +import com.facebook.presto.sql.analyzer.FeaturesConfig; +import com.facebook.presto.sql.planner.assertions.PlanMatchPattern; +import com.facebook.presto.testing.QueryRunner; +import com.facebook.presto.tests.AbstractTestQueryFramework; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.testng.annotations.Test; + +import java.util.List; +import java.util.Optional; + +import static com.facebook.presto.SystemSessionProperties.GROUPED_EXECUTION; +import static com.facebook.presto.SystemSessionProperties.PREFER_MERGE_JOIN_FOR_SORTED_INPUTS; +import static com.facebook.presto.hive.HiveQueryRunner.HIVE_CATALOG; +import static com.facebook.presto.hive.HiveSessionProperties.ORDER_BASED_EXECUTION_ENABLED; +import static com.facebook.presto.spi.plan.JoinDistributionType.PARTITIONED; +import static com.facebook.presto.spi.plan.JoinType.FULL; +import static com.facebook.presto.spi.plan.JoinType.INNER; +import static com.facebook.presto.spi.plan.JoinType.LEFT; +import static com.facebook.presto.spi.plan.JoinType.RIGHT; +import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.anyTree; +import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.equiJoinClause; +import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.join; +import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.mergeJoin; +import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.sort; +import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.tableScan; +import static io.airlift.tpch.TpchTable.CUSTOMER; +import static io.airlift.tpch.TpchTable.LINE_ITEM; +import static io.airlift.tpch.TpchTable.NATION; +import static io.airlift.tpch.TpchTable.ORDERS; + +public class TestMergeJoinPlanPrestoOnSpark + extends AbstractTestQueryFramework +{ + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + return HiveQueryRunner.createQueryRunner( + ImmutableList.of(ORDERS, LINE_ITEM, CUSTOMER, NATION), + ImmutableMap.of(), + Optional.empty()); + } + + @Override + protected FeaturesConfig createFeaturesConfig() + { + return new FeaturesConfig().setNativeExecutionEnabled(true).setPrestoSparkExecutionEnvironment(true); + } + + @Test + public void testJoinType() + { + QueryRunner queryRunner = getQueryRunner(); + + try { + queryRunner.execute("CREATE TABLE test_join_customer_join_type WITH ( \n" + + " bucket_count = 4, bucketed_by = ARRAY['custkey'], \n" + + " sorted_by = ARRAY['custkey'], partitioned_by=array['ds']) AS \n" + + "SELECT *, '2021-07-11' as ds FROM tpch.sf1.customer LIMIT 1000"); + + queryRunner.execute("CREATE TABLE test_join_order_join_type WITH ( \n" + + " bucket_count = 4, bucketed_by = ARRAY['custkey'], \n" + + " sorted_by = ARRAY['custkey'], partitioned_by=array['ds']) AS \n" + + "SELECT *, '2021-07-11' as ds FROM tpch.sf1.\"orders\" LIMIT 1000"); + + // When merge join session property is turned on and data properties requirements for merge join are met + // Inner join + assertPlan( + mergeJoinEnabled(), + "select * from test_join_customer_join_type join test_join_order_join_type on test_join_customer_join_type.custkey = test_join_order_join_type.custkey", + joinPlan("test_join_customer_join_type", "test_join_order_join_type", ImmutableList.of("custkey"), ImmutableList.of("custkey"), INNER, true)); + + // Left join + assertPlan( + mergeJoinEnabled(), + "select * from test_join_customer_join_type left join test_join_order_join_type on test_join_customer_join_type.custkey = test_join_order_join_type.custkey", + joinPlan("test_join_customer_join_type", "test_join_order_join_type", ImmutableList.of("custkey"), ImmutableList.of("custkey"), LEFT, true)); + + // Right join + assertPlan( + mergeJoinEnabled(), + "select * from test_join_customer_join_type right join test_join_order_join_type on test_join_customer_join_type.custkey = test_join_order_join_type.custkey", + joinPlan("test_join_customer_join_type", "test_join_order_join_type", ImmutableList.of("custkey"), ImmutableList.of("custkey"), RIGHT, true)); + + // Outer join + assertPlan( + mergeJoinEnabled(), + "select * from test_join_customer_join_type full join test_join_order_join_type on test_join_customer_join_type.custkey = test_join_order_join_type.custkey", + joinPlan("test_join_customer_join_type", "test_join_order_join_type", ImmutableList.of("custkey"), ImmutableList.of("custkey"), FULL, true)); + } + finally { + queryRunner.execute("DROP TABLE IF EXISTS test_join_customer_join_type"); + queryRunner.execute("DROP TABLE IF EXISTS test_join_order_join_type"); + } + } + + @Test + public void testSessionProperty() + { + QueryRunner queryRunner = getQueryRunner(); + + try { + queryRunner.execute("CREATE TABLE test_join_customer WITH ( \n" + + " bucket_count = 4, bucketed_by = ARRAY['custkey'], \n" + + " sorted_by = ARRAY['custkey'], partitioned_by=array['ds']) AS \n" + + "SELECT *, '2021-07-11' as ds FROM tpch.sf1.customer LIMIT 1000"); + + queryRunner.execute("CREATE TABLE test_join_order WITH ( \n" + + " bucket_count = 4, bucketed_by = ARRAY['custkey'], \n" + + " sorted_by = ARRAY['custkey'], partitioned_by=array['ds']) AS \n" + + "SELECT *, '2021-07-11' as ds FROM tpch.sf1.\"orders\" LIMIT 1000"); + + // By default, we can't enable merge join + assertPlan( + "select * from test_join_customer join test_join_order on test_join_customer.custkey = test_join_order.custkey", + joinPlan("test_join_customer", "test_join_order", ImmutableList.of("custkey"), ImmutableList.of("custkey"), INNER, false)); + + // when we miss session property, we can't enable merge join + assertPlan( + "select * from test_join_customer join test_join_order on test_join_customer.custkey = test_join_order.custkey", + joinPlan("test_join_customer", "test_join_order", ImmutableList.of("custkey"), ImmutableList.of("custkey"), INNER, false)); + + // When merge join session property is turned on and data properties requirements for merge join are met + assertPlan( + mergeJoinEnabled(), + "select * from test_join_customer join test_join_order on test_join_customer.custkey = test_join_order.custkey", + joinPlan("test_join_customer", "test_join_order", ImmutableList.of("custkey"), ImmutableList.of("custkey"), INNER, true)); + + // Presto on spark does not need the grouped execution to be enabled + assertPlan( + groupedExecutionDisabled(), + "select * from test_join_customer join test_join_order on test_join_customer.custkey = test_join_order.custkey", + joinPlan("test_join_customer", "test_join_order", ImmutableList.of("custkey"), ImmutableList.of("custkey"), INNER, true)); + } + finally { + queryRunner.execute("DROP TABLE IF EXISTS test_join_customer"); + queryRunner.execute("DROP TABLE IF EXISTS test_join_order"); + } + } + + @Test + public void testDifferentBucketedByKey() + { + QueryRunner queryRunner = getQueryRunner(); + + try { + queryRunner.execute("CREATE TABLE test_join_customer2 WITH ( \n" + + " bucket_count = 4, bucketed_by = ARRAY['name'], \n" + + " sorted_by = ARRAY['custkey'], partitioned_by=array['ds']) AS \n" + + "SELECT *, '2021-07-11' as ds FROM tpch.sf1.customer LIMIT 1000"); + + queryRunner.execute("CREATE TABLE test_join_order2 WITH ( \n" + + " bucket_count = 4, bucketed_by = ARRAY['custkey'], \n" + + " sorted_by = ARRAY['custkey'], partitioned_by=array['ds']) AS \n" + + "SELECT *, '2021-07-11' as ds FROM tpch.sf1.\"orders\" LIMIT 1000"); + + // merge join can't be enabled + assertPlan( + mergeJoinEnabled(), + "select * from test_join_customer2 join test_join_order2 on test_join_customer2.custkey = test_join_order2.custkey", + anyTree( + mergeJoin(INNER, ImmutableList.of(equiJoinClause("custkey_l", "custkey_r")), + Optional.empty(), + anyTree(sort(anyTree(tableScan("test_join_customer2", ImmutableMap.of("custkey_l", "custkey"))))), + tableScan("test_join_order2", ImmutableMap.of("custkey_r", "custkey"))))); + } + finally { + queryRunner.execute("DROP TABLE IF EXISTS test_join_customer2"); + queryRunner.execute("DROP TABLE IF EXISTS test_join_order2"); + } + } + + @Test + public void testDifferentSortByKey() + { + QueryRunner queryRunner = getQueryRunner(); + + try { + queryRunner.execute("CREATE TABLE test_join_customer3 WITH ( \n" + + " bucket_count = 4, bucketed_by = ARRAY['custkey'], \n" + + " sorted_by = ARRAY['name'], partitioned_by=array['ds']) AS \n" + + "SELECT *, '2021-07-11' as ds FROM tpch.sf1.customer LIMIT 1000"); + + queryRunner.execute("CREATE TABLE test_join_order3 WITH ( \n" + + " bucket_count = 4, bucketed_by = ARRAY['custkey'], \n" + + " sorted_by = ARRAY['custkey'], partitioned_by=array['ds']) AS \n" + + "SELECT *, '2021-07-11' as ds FROM tpch.sf1.\"orders\" LIMIT 1000"); + + // merge join can be enabled when only one side is sorted (Presto on Spark behavior) + assertPlan( + mergeJoinEnabled(), + "select * from test_join_customer3 join test_join_order3 on test_join_customer3.custkey = test_join_order3.custkey", + anyTree( + mergeJoin(INNER, ImmutableList.of(equiJoinClause("custkey_l", "custkey_r")), + Optional.empty(), + anyTree(sort(tableScan("test_join_customer3", ImmutableMap.of("custkey_l", "custkey")))), + tableScan("test_join_order3", ImmutableMap.of("custkey_r", "custkey"))))); + } + finally { + queryRunner.execute("DROP TABLE IF EXISTS test_join_customer3"); + queryRunner.execute("DROP TABLE IF EXISTS test_join_order3"); + } + } + + @Test + public void testMultipleSortByKeys() + { + QueryRunner queryRunner = getQueryRunner(); + + try { + queryRunner.execute("CREATE TABLE test_join_customer4 WITH ( \n" + + " bucket_count = 4, bucketed_by = ARRAY['custkey'], \n" + + " sorted_by = ARRAY['custkey', 'name'], partitioned_by=array['ds']) AS \n" + + "SELECT *, '2021-07-11' as ds FROM tpch.sf1.customer LIMIT 1000"); + + queryRunner.execute("CREATE TABLE test_join_order4 WITH ( \n" + + " bucket_count = 4, bucketed_by = ARRAY['custkey'], \n" + + " sorted_by = ARRAY['custkey'], partitioned_by=array['ds']) AS \n" + + "SELECT *, '2021-07-11' as ds FROM tpch.sf1.\"orders\" LIMIT 1000"); + + // merge join can be enabled + assertPlan( + mergeJoinEnabled(), + "select * from test_join_customer4 join test_join_order4 on test_join_customer4.custkey = test_join_order4.custkey", + joinPlan("test_join_customer4", "test_join_order4", ImmutableList.of("custkey"), ImmutableList.of("custkey"), INNER, true)); + } + finally { + queryRunner.execute("DROP TABLE IF EXISTS test_join_customer4"); + queryRunner.execute("DROP TABLE IF EXISTS test_join_order4"); + } + } + + @Test + public void testMultipleJoinKeys() + { + QueryRunner queryRunner = getQueryRunner(); + + try { + queryRunner.execute("CREATE TABLE test_join_customer5(" + + " \"custkey\" bigint, \"name\" varchar(25), \"address\" varchar(40), \"orderkey\" bigint, \"phone\" varchar(15), \n" + + " \"acctbal\" double, \"mktsegment\" varchar(10), \"comment\" varchar(117), \"ds\" varchar(10)) WITH ( \n" + + " bucket_count = 4, bucketed_by = ARRAY['custkey', 'orderkey'], \n" + + " sorted_by = ARRAY['custkey', 'orderkey'], partitioned_by=array['ds'], \n" + + " format = 'DWRF' )"); + queryRunner.execute("INSERT INTO test_join_customer5 \n" + + "SELECT *, '2021-07-11' as ds FROM tpch.sf1.customer LIMIT 1000"); + + queryRunner.execute("CREATE TABLE test_join_order5(" + + " \"orderkey\" bigint, \"custkey\" bigint, \"orderstatus\" varchar(1), \"totalprice\" double, \"orderdate\" date," + + " \"orderpriority\" varchar(15), \"clerk\" varchar(15), \"shippriority\" integer, \"comment\" varchar(79), \"ds\" varchar(10)) WITH ( \n" + + " bucket_count = 4, bucketed_by = ARRAY['custkey', 'orderkey'], \n" + + " sorted_by = ARRAY['custkey', 'orderkey'], partitioned_by=array['ds'])"); + queryRunner.execute("INSERT INTO test_join_order5 \n" + + "SELECT *, '2021-07-11' as ds FROM tpch.sf1.orders LIMIT 1000"); + + // merge join can be enabled + assertPlan( + mergeJoinEnabled(), + "select * from test_join_customer5 join test_join_order5 on test_join_customer5.custkey = test_join_order5.custkey and test_join_customer5.orderkey = test_join_order5.orderkey", + joinPlan("test_join_customer5", "test_join_order5", ImmutableList.of("custkey", "orderkey"), ImmutableList.of("custkey", "orderkey"), INNER, true)); + } + finally { + queryRunner.execute("DROP TABLE IF EXISTS test_join_customer5"); + queryRunner.execute("DROP TABLE IF EXISTS test_join_order5"); + } + } + + @Test + public void testMultiplePartitions() + { + QueryRunner queryRunner = getQueryRunner(); + + try { + queryRunner.execute("CREATE TABLE test_join_customer_multi_partitions WITH ( \n" + + " bucket_count = 4, bucketed_by = ARRAY['custkey'], \n" + + " sorted_by = ARRAY['custkey'], partitioned_by=array['ds']) AS \n" + + "SELECT *, '2021-07-11' as ds FROM tpch.sf1.customer LIMIT 1000"); + queryRunner.execute("INSERT INTO test_join_customer_multi_partitions \n" + + "SELECT *, '2021-07-12' as ds FROM tpch.sf1.customer LIMIT 1000"); + + queryRunner.execute("CREATE TABLE test_join_order_multi_partitions WITH ( \n" + + " bucket_count = 4, bucketed_by = ARRAY['custkey'], \n" + + " sorted_by = ARRAY['custkey'], partitioned_by=array['ds']) AS \n" + + "SELECT *, '2021-07-11' as ds FROM tpch.sf1.\"orders\" LIMIT 1000"); + queryRunner.execute("INSERT INTO test_join_order_multi_partitions \n" + + "SELECT *, '2021-07-12' as ds FROM tpch.sf1.orders LIMIT 1000"); + + // When partition key does not appear in join keys and we query multiple partitions, we can't enable merge join + assertPlan( + mergeJoinEnabled(), + "select * from test_join_customer_multi_partitions join test_join_order_multi_partitions on test_join_customer_multi_partitions.custkey = test_join_order_multi_partitions.custkey", + joinPlan("test_join_customer_multi_partitions", "test_join_order_multi_partitions", ImmutableList.of("custkey"), ImmutableList.of("custkey"), INNER, false)); + } + finally { + queryRunner.execute("DROP TABLE IF EXISTS test_join_customer_multi_partitions"); + queryRunner.execute("DROP TABLE IF EXISTS test_join_order_multi_partitions"); + } + } + + @Test + public void testBothSidesNotBucketed() + { + QueryRunner queryRunner = getQueryRunner(); + + try { + queryRunner.execute("CREATE TABLE test_join_customer_not_bucketed WITH ( \n" + + " partitioned_by=array['ds']) AS \n" + + "SELECT *, '2021-07-11' as ds FROM tpch.sf1.customer LIMIT 1000"); + + queryRunner.execute("CREATE TABLE test_join_order_not_bucketed WITH ( \n" + + " partitioned_by=array['ds']) AS \n" + + "SELECT *, '2021-07-11' as ds FROM tpch.sf1.\"orders\" LIMIT 1000"); + + // merge join can't be enabled when both sides are not bucketed + assertPlan( + mergeJoinEnabled(), + "select * from test_join_customer_not_bucketed join test_join_order_not_bucketed on test_join_customer_not_bucketed.custkey = test_join_order_not_bucketed.custkey", + joinPlan("test_join_customer_not_bucketed", "test_join_order_not_bucketed", ImmutableList.of("custkey"), ImmutableList.of("custkey"), INNER, false)); + } + finally { + queryRunner.execute("DROP TABLE IF EXISTS test_join_customer_not_bucketed"); + queryRunner.execute("DROP TABLE IF EXISTS test_join_order_not_bucketed"); + } + } + + @Test + public void testOnlyOneSideBucketedAndSorted() + { + QueryRunner queryRunner = getQueryRunner(); + + try { + queryRunner.execute("CREATE TABLE test_join_customer_not_bucketed_sorted WITH ( \n" + + " partitioned_by=array['ds']) AS \n" + + "SELECT *, '2021-07-11' as ds FROM tpch.sf1.customer LIMIT 1000"); + + queryRunner.execute("CREATE TABLE test_join_order_bucketed_sorted WITH ( \n" + + " bucket_count = 4, bucketed_by = ARRAY['custkey'], \n" + + " sorted_by = ARRAY['custkey'], partitioned_by=array['ds']) AS \n" + + "SELECT *, '2021-07-11' as ds FROM tpch.sf1.\"orders\" LIMIT 1000"); + + // merge join can be enabled when only one side is bucketed and sorted + assertPlan( + mergeJoinEnabled(), + "select * from test_join_customer_not_bucketed_sorted join test_join_order_bucketed_sorted on test_join_customer_not_bucketed_sorted.custkey = test_join_order_bucketed_sorted.custkey", + anyTree( + mergeJoin(INNER, ImmutableList.of(equiJoinClause("custkey_l", "custkey_r")), + Optional.empty(), + anyTree(sort(anyTree(tableScan("test_join_customer_not_bucketed_sorted", ImmutableMap.of("custkey_l", "custkey"))))), + tableScan("test_join_order_bucketed_sorted", ImmutableMap.of("custkey_r", "custkey"))))); + } + finally { + queryRunner.execute("DROP TABLE IF EXISTS test_join_customer_not_bucketed_sorted"); + queryRunner.execute("DROP TABLE IF EXISTS test_join_order_bucketed_sorted"); + } + } + + private Session groupedExecutionDisabled() + { + return Session.builder(getQueryRunner().getDefaultSession()) + .setSystemProperty(PREFER_MERGE_JOIN_FOR_SORTED_INPUTS, "true") + .setSystemProperty(GROUPED_EXECUTION, "false") + .setCatalogSessionProperty(HIVE_CATALOG, ORDER_BASED_EXECUTION_ENABLED, "true") + .build(); + } + + private Session mergeJoinEnabled() + { + return Session.builder(getQueryRunner().getDefaultSession()) + .setSystemProperty(PREFER_MERGE_JOIN_FOR_SORTED_INPUTS, "true") + .setSystemProperty(GROUPED_EXECUTION, "true") + .setCatalogSessionProperty(HIVE_CATALOG, ORDER_BASED_EXECUTION_ENABLED, "true") + .build(); + } + + private PlanMatchPattern joinPlan(String leftTableName, String rightTableName, List leftJoinKeys, List rightJoinKeys, JoinType joinType, boolean mergeJoinEnabled) + { + int suffix1 = 0; + int suffix2 = 1; + ImmutableMap.Builder leftColumnReferencesBuilder = ImmutableMap.builder(); + ImmutableMap.Builder rightColumnReferencesBuilder = ImmutableMap.builder(); + ImmutableList.Builder joinClauses = ImmutableList.builder(); + for (int i = 0; i < leftJoinKeys.size(); i++) { + leftColumnReferencesBuilder.put(leftJoinKeys.get(i) + suffix1, leftJoinKeys.get(i)); + rightColumnReferencesBuilder.put(rightJoinKeys.get(i) + suffix2, rightJoinKeys.get(i)); + joinClauses.add(equiJoinClause(leftJoinKeys.get(i) + suffix1, rightJoinKeys.get(i) + suffix2)); + suffix1 = suffix1 + 2; + suffix2 = suffix2 + 2; + } + + return mergeJoinEnabled ? + anyTree(mergeJoin( + joinType, + joinClauses.build(), + Optional.empty(), + PlanMatchPattern.tableScan(leftTableName, leftColumnReferencesBuilder.build()), + PlanMatchPattern.tableScan(rightTableName, rightColumnReferencesBuilder.build()))) : + anyTree(join( + joinType, + joinClauses.build(), + Optional.empty(), + Optional.of(PARTITIONED), + anyTree(PlanMatchPattern.tableScan(leftTableName, leftColumnReferencesBuilder.build())), + anyTree(PlanMatchPattern.tableScan(rightTableName, rightColumnReferencesBuilder.build())))); + } +} diff --git a/presto-main-base/src/main/java/com/facebook/presto/sql/planner/GroupedExecutionTagger.java b/presto-main-base/src/main/java/com/facebook/presto/sql/planner/GroupedExecutionTagger.java index 71d1199bf9da8..ab60b754009bc 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/sql/planner/GroupedExecutionTagger.java +++ b/presto-main-base/src/main/java/com/facebook/presto/sql/planner/GroupedExecutionTagger.java @@ -39,7 +39,6 @@ import java.util.OptionalInt; import static com.facebook.presto.SystemSessionProperties.GROUPED_EXECUTION; -import static com.facebook.presto.SystemSessionProperties.isGroupedExecutionEnabled; import static com.facebook.presto.SystemSessionProperties.preferSortMergeJoin; import static com.facebook.presto.spi.StandardErrorCode.INVALID_PLAN_ERROR; import static com.facebook.presto.spi.connector.ConnectorCapabilities.SUPPORTS_PAGE_SINK_COMMIT; @@ -58,13 +57,15 @@ class GroupedExecutionTagger private final Metadata metadata; private final NodePartitioningManager nodePartitioningManager; private final boolean groupedExecutionEnabled; + private final boolean isPrestoOnSpark; - public GroupedExecutionTagger(Session session, Metadata metadata, NodePartitioningManager nodePartitioningManager) + public GroupedExecutionTagger(Session session, Metadata metadata, NodePartitioningManager nodePartitioningManager, boolean groupedExecutionEnabled, boolean isPrestoOnSpark) { this.session = requireNonNull(session, "session is null"); this.metadata = requireNonNull(metadata, "metadata is null"); this.nodePartitioningManager = requireNonNull(nodePartitioningManager, "nodePartitioningManager is null"); - this.groupedExecutionEnabled = isGroupedExecutionEnabled(session); + this.groupedExecutionEnabled = groupedExecutionEnabled; + this.isPrestoOnSpark = isPrestoOnSpark; } @Override @@ -166,6 +167,15 @@ public GroupedExecutionTagger.GroupedExecutionProperties visitMergeJoin(MergeJoi // TODO: This will break the other use case for merge join operating on sorted tables, which requires grouped execution for correctness. return GroupedExecutionTagger.GroupedExecutionProperties.notCapable(); } + + if (isPrestoOnSpark) { + GroupedExecutionTagger.GroupedExecutionProperties mergeJoinLeft = node.getLeft().accept(new GroupedExecutionTagger(session, metadata, nodePartitioningManager, true, true), null); + GroupedExecutionTagger.GroupedExecutionProperties mergeJoinRight = node.getRight().accept(new GroupedExecutionTagger(session, metadata, nodePartitioningManager, true, true), null); + if (mergeJoinLeft.currentNodeCapable || mergeJoinRight.currentNodeCapable) { + return GroupedExecutionTagger.GroupedExecutionProperties.notCapable(); + } + } + throw new PrestoException( INVALID_PLAN_ERROR, format("When grouped execution can't be enabled, merge join plan is not valid." + diff --git a/presto-main-base/src/main/java/com/facebook/presto/sql/planner/PlanFragmenter.java b/presto-main-base/src/main/java/com/facebook/presto/sql/planner/PlanFragmenter.java index 07056c735f195..e526fdf4ed573 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/sql/planner/PlanFragmenter.java +++ b/presto-main-base/src/main/java/com/facebook/presto/sql/planner/PlanFragmenter.java @@ -52,6 +52,7 @@ public class PlanFragmenter private final QueryManagerConfig config; private final PlanChecker distributedPlanChecker; private final PlanChecker singleNodePlanChecker; + private final boolean isPrestoOnSpark; @Inject public PlanFragmenter(Metadata metadata, NodePartitioningManager nodePartitioningManager, QueryManagerConfig queryManagerConfig, FeaturesConfig featuresConfig, PlanCheckerProviderManager planCheckerProviderManager) @@ -61,6 +62,7 @@ public PlanFragmenter(Metadata metadata, NodePartitioningManager nodePartitionin this.config = requireNonNull(queryManagerConfig, "queryManagerConfig is null"); this.distributedPlanChecker = new PlanChecker(requireNonNull(featuresConfig, "featuresConfig is null"), false, planCheckerProviderManager); this.singleNodePlanChecker = new PlanChecker(requireNonNull(featuresConfig, "featuresConfig is null"), true, planCheckerProviderManager); + this.isPrestoOnSpark = featuresConfig.isPrestoSparkExecutionEnvironment(); } public SubPlan createSubPlans(Session session, Plan plan, boolean noExchange, PlanNodeIdAllocator idAllocator, WarningCollector warningCollector) @@ -90,7 +92,7 @@ public SubPlan createSubPlans(Session session, Plan plan, boolean noExchange, Pl PlanNode root = SimplePlanRewriter.rewriteWith(fragmenter, plan.getRoot(), properties); SubPlan subPlan = fragmenter.buildRootFragment(root, properties); - return finalizeSubPlan(subPlan, config, metadata, nodePartitioningManager, session, noExchange, warningCollector, subPlan.getFragment().getPartitioning()); + return finalizeSubPlan(subPlan, config, metadata, nodePartitioningManager, session, noExchange, warningCollector, subPlan.getFragment().getPartitioning(), isPrestoOnSpark); } private static class Fragmenter diff --git a/presto-main-base/src/main/java/com/facebook/presto/sql/planner/PlanFragmenterUtils.java b/presto-main-base/src/main/java/com/facebook/presto/sql/planner/PlanFragmenterUtils.java index 41c0f15975284..d603499e40762 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/sql/planner/PlanFragmenterUtils.java +++ b/presto-main-base/src/main/java/com/facebook/presto/sql/planner/PlanFragmenterUtils.java @@ -46,6 +46,7 @@ import static com.facebook.presto.SystemSessionProperties.getExchangeMaterializationStrategy; import static com.facebook.presto.SystemSessionProperties.getQueryMaxStageCount; import static com.facebook.presto.SystemSessionProperties.isForceSingleNodeOutput; +import static com.facebook.presto.SystemSessionProperties.isGroupedExecutionEnabled; import static com.facebook.presto.SystemSessionProperties.isRecoverableGroupedExecutionEnabled; import static com.facebook.presto.SystemSessionProperties.isSingleNodeExecutionEnabled; import static com.facebook.presto.spi.StandardErrorCode.QUERY_HAS_TOO_MANY_STAGES; @@ -92,12 +93,13 @@ public static SubPlan finalizeSubPlan( Session session, boolean noExchange, WarningCollector warningCollector, - PartitioningHandle partitioningHandle) + PartitioningHandle partitioningHandle, + boolean isPrestoOnSpark) { subPlan = reassignPartitioningHandleIfNecessary(metadata, session, subPlan, partitioningHandle); if (!noExchange && !isSingleNodeExecutionEnabled(session)) { // grouped execution is not supported for SINGLE_DISTRIBUTION or SINGLE_NODE_EXECUTION_ENABLED - subPlan = analyzeGroupedExecution(session, subPlan, false, metadata, nodePartitioningManager); + subPlan = analyzeGroupedExecution(session, subPlan, false, metadata, nodePartitioningManager, isPrestoOnSpark); } checkState(subPlan.getFragment().getId().getId() != ROOT_FRAGMENT_ID || !isForceSingleNodeOutput(session) || subPlan.getFragment().getPartitioning().isSingleNode(), "Root of PlanFragment is not single node"); @@ -148,10 +150,10 @@ private static void sanityCheckFragmentedPlan( * TODO: We should introduce "query section" and make recoverability analysis done at query section level. */ - private static SubPlan analyzeGroupedExecution(Session session, SubPlan subPlan, boolean parentContainsTableFinish, Metadata metadata, NodePartitioningManager nodePartitioningManager) + private static SubPlan analyzeGroupedExecution(Session session, SubPlan subPlan, boolean parentContainsTableFinish, Metadata metadata, NodePartitioningManager nodePartitioningManager, boolean isPrestoOnSpark) { PlanFragment fragment = subPlan.getFragment(); - GroupedExecutionTagger.GroupedExecutionProperties properties = fragment.getRoot().accept(new GroupedExecutionTagger(session, metadata, nodePartitioningManager), null); + GroupedExecutionTagger.GroupedExecutionProperties properties = fragment.getRoot().accept(new GroupedExecutionTagger(session, metadata, nodePartitioningManager, isGroupedExecutionEnabled(session), isPrestoOnSpark), null); if (properties.isSubTreeUseful()) { boolean preferDynamic = fragment.getRemoteSourceNodes().stream().allMatch(node -> node.getExchangeType() == REPLICATE) && new HashSet<>(properties.getCapableTableScanNodes()).containsAll(fragment.getTableScanSchedulingOrder()); @@ -185,7 +187,7 @@ private static SubPlan analyzeGroupedExecution(Session session, SubPlan subPlan, ImmutableList.Builder result = ImmutableList.builder(); boolean containsTableFinishNode = containsTableFinishNode(fragment); for (SubPlan child : subPlan.getChildren()) { - result.add(analyzeGroupedExecution(session, child, containsTableFinishNode, metadata, nodePartitioningManager)); + result.add(analyzeGroupedExecution(session, child, containsTableFinishNode, metadata, nodePartitioningManager, isPrestoOnSpark)); } return new SubPlan(fragment, result.build()); } diff --git a/presto-main-base/src/main/java/com/facebook/presto/sql/planner/PlanOptimizers.java b/presto-main-base/src/main/java/com/facebook/presto/sql/planner/PlanOptimizers.java index 2d7c8be053645..c46c666c105df 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/sql/planner/PlanOptimizers.java +++ b/presto-main-base/src/main/java/com/facebook/presto/sql/planner/PlanOptimizers.java @@ -947,7 +947,7 @@ public PlanOptimizers( // MergeJoinForSortedInputOptimizer can avoid the local exchange for a join operation // Should be placed after AddExchanges, but before AddLocalExchange // To replace the JoinNode to MergeJoin ahead of AddLocalExchange to avoid adding extra local exchange - builder.add(new MergeJoinForSortedInputOptimizer(metadata, featuresConfig.isNativeExecutionEnabled()), + builder.add(new MergeJoinForSortedInputOptimizer(metadata, featuresConfig.isNativeExecutionEnabled(), featuresConfig.isPrestoSparkExecutionEnvironment()), new SortMergeJoinOptimizer(metadata, featuresConfig.isNativeExecutionEnabled())); // Optimizers above this don't understand local exchanges, so be careful moving this. diff --git a/presto-main-base/src/main/java/com/facebook/presto/sql/planner/optimizations/MergeJoinForSortedInputOptimizer.java b/presto-main-base/src/main/java/com/facebook/presto/sql/planner/optimizations/MergeJoinForSortedInputOptimizer.java index fc1735f7e7e70..f8edfeb37e69b 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/sql/planner/optimizations/MergeJoinForSortedInputOptimizer.java +++ b/presto-main-base/src/main/java/com/facebook/presto/sql/planner/optimizations/MergeJoinForSortedInputOptimizer.java @@ -20,19 +20,24 @@ import com.facebook.presto.spi.plan.EquiJoinClause; import com.facebook.presto.spi.plan.JoinNode; import com.facebook.presto.spi.plan.MergeJoinNode; +import com.facebook.presto.spi.plan.Ordering; +import com.facebook.presto.spi.plan.OrderingScheme; import com.facebook.presto.spi.plan.PlanNode; import com.facebook.presto.spi.plan.PlanNodeIdAllocator; +import com.facebook.presto.spi.plan.SortNode; import com.facebook.presto.spi.relation.VariableReferenceExpression; import com.facebook.presto.sql.planner.TypeProvider; import com.facebook.presto.sql.planner.plan.SimplePlanRewriter; +import com.google.common.collect.ImmutableList; import java.util.List; +import java.util.Optional; import static com.facebook.presto.SystemSessionProperties.isGroupedExecutionEnabled; import static com.facebook.presto.SystemSessionProperties.isSingleNodeExecutionEnabled; import static com.facebook.presto.SystemSessionProperties.preferMergeJoinForSortedInputs; import static com.facebook.presto.common.block.SortOrder.ASC_NULLS_FIRST; -import static com.facebook.presto.spi.plan.JoinType.INNER; +import static com.facebook.presto.sql.planner.plan.ChildReplacer.replaceChildren; import static com.google.common.collect.ImmutableList.toImmutableList; import static java.util.Objects.requireNonNull; @@ -41,12 +46,14 @@ public class MergeJoinForSortedInputOptimizer { private final Metadata metadata; private final boolean nativeExecution; + private final boolean prestoOnSpark; private boolean isEnabledForTesting; - public MergeJoinForSortedInputOptimizer(Metadata metadata, boolean nativeExecution) + public MergeJoinForSortedInputOptimizer(Metadata metadata, boolean nativeExecution, boolean prestoOnSpark) { this.metadata = requireNonNull(metadata, "metadata is null"); this.nativeExecution = nativeExecution; + this.prestoOnSpark = prestoOnSpark; } @Override @@ -58,7 +65,7 @@ public void setEnabledForTesting(boolean isSet) @Override public boolean isEnabled(Session session) { - return isEnabledForTesting || isGroupedExecutionEnabled(session) && preferMergeJoinForSortedInputs(session) && !isSingleNodeExecutionEnabled(session); + return isEnabledForTesting || nativeExecution && (isGroupedExecutionEnabled(session) || prestoOnSpark) && preferMergeJoinForSortedInputs(session) && !isSingleNodeExecutionEnabled(session); } @Override @@ -70,7 +77,7 @@ public PlanOptimizerResult optimize(PlanNode plan, Session session, TypeProvider requireNonNull(idAllocator, "idAllocator is null"); if (isEnabled(session)) { - Rewriter rewriter = new MergeJoinForSortedInputOptimizer.Rewriter(variableAllocator, idAllocator, metadata, session); + Rewriter rewriter = new MergeJoinForSortedInputOptimizer.Rewriter(idAllocator, metadata, session, prestoOnSpark); PlanNode rewrittenPlan = SimplePlanRewriter.rewriteWith(rewriter, plan, null); return PlanOptimizerResult.optimizerResult(rewrittenPlan, rewriter.isPlanChanged()); } @@ -83,15 +90,15 @@ private class Rewriter private final PlanNodeIdAllocator idAllocator; private final Metadata metadata; private final Session session; - private final TypeProvider types; + private final boolean prestoOnSpark; private boolean planChanged; - private Rewriter(VariableAllocator variableAllocator, PlanNodeIdAllocator idAllocator, Metadata metadata, Session session) + private Rewriter(PlanNodeIdAllocator idAllocator, Metadata metadata, Session session, boolean prestoOnSpark) { this.idAllocator = requireNonNull(idAllocator, "idAllocator is null"); this.metadata = requireNonNull(metadata, "metadata is null"); this.session = requireNonNull(session, "session is null"); - this.types = TypeProvider.viewOf(variableAllocator.getVariables()); + this.prestoOnSpark = prestoOnSpark; } public boolean isPlanChanged() @@ -102,62 +109,63 @@ public boolean isPlanChanged() @Override public PlanNode visitJoin(JoinNode node, RewriteContext context) { - // As of now, we only support inner join for merge join - if (node.getType() != INNER) { - return node; + PlanNode rewrittenLeft = node.getLeft().accept(this, context); + PlanNode rewrittenRight = node.getRight().accept(this, context); + + boolean leftInputSorted = isPlanOutputSortedByColumns(rewrittenLeft, node.getCriteria().stream().map(EquiJoinClause::getLeft).collect(toImmutableList())); + boolean rightInputSorted = isPlanOutputSortedByColumns(rewrittenRight, node.getCriteria().stream().map(EquiJoinClause::getRight).collect(toImmutableList())); + + if ((!leftInputSorted && !rightInputSorted) || (!prestoOnSpark && (!leftInputSorted || !rightInputSorted))) { + return replaceChildren(node, ImmutableList.of(rewrittenLeft, rewrittenRight)); } - // Fast path merge join optimization (no sort, no local merge) - - // For example: when we have a plan that looks like: - // JoinNode - //- TableScanA - //- TableScanB - - // We check the data properties of TableScanA and TableScanB to see if they meet requirements for merge join: - // 1. If so, we replace the JoinNode to MergeJoinNode - // MergeJoinNode - //- TableScanA - //- TableScanB - - // 2. If not, we don't optimize - if (meetsDataRequirement(node.getLeft(), node.getRight(), node)) { - planChanged = true; - return new MergeJoinNode( - node.getSourceLocation(), - node.getId(), - node.getType(), - node.getLeft(), - node.getRight(), - node.getCriteria(), - node.getOutputVariables(), - node.getFilter(), - node.getLeftHashVariable(), - node.getRightHashVariable()); + if (!leftInputSorted) { + List leftOrdering = node.getCriteria().stream() + .map(criterion -> new Ordering(criterion.getLeft(), ASC_NULLS_FIRST)) + .collect(toImmutableList()); + rewrittenLeft = new SortNode( + Optional.empty(), + idAllocator.getNextId(), + rewrittenLeft, + new OrderingScheme(leftOrdering), + true, + ImmutableList.of()); } - return node; + if (!rightInputSorted) { + List rightOrdering = node.getCriteria().stream() + .map(criterion -> new Ordering(criterion.getRight(), ASC_NULLS_FIRST)) + .collect(toImmutableList()); + rewrittenRight = new SortNode( + Optional.empty(), + idAllocator.getNextId(), + rewrittenRight, + new OrderingScheme(rightOrdering), + true, + ImmutableList.of()); + } + planChanged = true; + return new MergeJoinNode( + node.getSourceLocation(), + node.getId(), + node.getType(), + rewrittenLeft, + rewrittenRight, + node.getCriteria(), + node.getOutputVariables(), + node.getFilter(), + Optional.empty(), + Optional.empty()); } - private boolean meetsDataRequirement(PlanNode left, PlanNode right, JoinNode node) + private boolean isPlanOutputSortedByColumns(PlanNode plan, List columns) { - // Acquire data properties for both left and right side - StreamPropertyDerivations.StreamProperties leftProperties = StreamPropertyDerivations.derivePropertiesRecursively(left, metadata, session, nativeExecution); - StreamPropertyDerivations.StreamProperties rightProperties = StreamPropertyDerivations.derivePropertiesRecursively(right, metadata, session, nativeExecution); - - List leftJoinColumns = node.getCriteria().stream().map(EquiJoinClause::getLeft).collect(toImmutableList()); - List rightJoinColumns = node.getCriteria().stream() - .map(EquiJoinClause::getRight) - .collect(toImmutableList()); - - // Check if both the left side and right side's partitioning columns (bucketed-by columns [B]) are a subset of join columns [J] - // B = subset (J) - if (!verifyStreamProperties(leftProperties, leftJoinColumns) || !verifyStreamProperties(rightProperties, rightJoinColumns)) { + StreamPropertyDerivations.StreamProperties properties = StreamPropertyDerivations.derivePropertiesRecursively(plan, metadata, session, nativeExecution); + + if (!verifyStreamProperties(properties, columns)) { return false; } - // Check if the left side and right side are both ordered by the join columns - return !LocalProperties.match(rightProperties.getLocalProperties(), LocalProperties.sorted(rightJoinColumns, ASC_NULLS_FIRST)).get(0).isPresent() && - !LocalProperties.match(leftProperties.getLocalProperties(), LocalProperties.sorted(leftJoinColumns, ASC_NULLS_FIRST)).get(0).isPresent(); + return !LocalProperties.match(properties.getLocalProperties(), LocalProperties.sorted(columns, ASC_NULLS_FIRST)).get(0).isPresent(); } private boolean verifyStreamProperties(StreamPropertyDerivations.StreamProperties streamProperties, List joinColumns) diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkAdaptiveQueryExecution.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkAdaptiveQueryExecution.java index 2076e695cbf90..e4e960cddad54 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkAdaptiveQueryExecution.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkAdaptiveQueryExecution.java @@ -240,7 +240,8 @@ private IterativePlanFragmenter createIterativePlanFragmenter(PlanCheckerProvide this.queryManagerConfig, this.session, this.warningCollector, - noExchange); + noExchange, + true); } @Override diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/planner/IterativePlanFragmenter.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/planner/IterativePlanFragmenter.java index cce0a36deb0da..270ab79d7cf25 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/planner/IterativePlanFragmenter.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/planner/IterativePlanFragmenter.java @@ -93,6 +93,7 @@ public class IterativePlanFragmenter private final Session session; private final WarningCollector warningCollector; private final boolean noExchange; + private final boolean isPrestoOnSpark; // Fragment numbers need to be unique across the whole query, // so keep it in this top-level class. @@ -115,7 +116,8 @@ public IterativePlanFragmenter( QueryManagerConfig queryManagerConfig, Session session, WarningCollector warningCollector, - boolean noExchange) + boolean noExchange, + boolean isPrestoOnSpark) { this.originalPlan = requireNonNull(originalPlan, "originalPlan is null"); this.isFragmentFinished = requireNonNull(isFragmentFinished, "isSourceReady is null"); @@ -128,6 +130,7 @@ public IterativePlanFragmenter( this.session = requireNonNull(session, "session is null"); this.warningCollector = requireNonNull(warningCollector, "warningCollector is null"); this.noExchange = noExchange; + this.isPrestoOnSpark = isPrestoOnSpark; } /** @@ -181,7 +184,7 @@ public PlanAndFragments createReadySubPlans(PlanNode plan) // and rewriting the partition handle PartitioningHandle partitioningHandle = properties.getPartitioningHandle(); subPlans = subPlans.stream() - .map(subPlan -> finalizeSubPlan(subPlan, queryManagerConfig, metadata, nodePartitioningManager, session, noExchange, warningCollector, partitioningHandle)) + .map(subPlan -> finalizeSubPlan(subPlan, queryManagerConfig, metadata, nodePartitioningManager, session, noExchange, warningCollector, partitioningHandle, isPrestoOnSpark)) .collect(toImmutableList()); return new PlanAndFragments(remainingPlan, subPlans); diff --git a/presto-spark-base/src/test/java/com/facebook/presto/spark/planner/TestIterativePlanFragmenter.java b/presto-spark-base/src/test/java/com/facebook/presto/spark/planner/TestIterativePlanFragmenter.java index 250b970b511ad..5f46474c16ae0 100644 --- a/presto-spark-base/src/test/java/com/facebook/presto/spark/planner/TestIterativePlanFragmenter.java +++ b/presto-spark-base/src/test/java/com/facebook/presto/spark/planner/TestIterativePlanFragmenter.java @@ -237,7 +237,8 @@ private Void runTestIterativePlanFragmenter(PlanNode node, Plan plan, SubPlan fu new QueryManagerConfig(), session, WarningCollector.NOOP, - false); + false, + true); PlanAndFragments nextPlanAndFragments = getNextPlanAndFragments(iterativePlanFragmenter, node); assertTrue(nextPlanAndFragments.getRemainingPlan().isPresent());