diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java index 6d8e2de1b7b16..f0a61e3ce6821 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java @@ -223,6 +223,7 @@ import static com.facebook.presto.hive.HiveSessionProperties.isOfflineDataDebugModeEnabled; import static com.facebook.presto.hive.HiveSessionProperties.isOptimizedMismatchedBucketCount; import static com.facebook.presto.hive.HiveSessionProperties.isOptimizedPartitionUpdateSerializationEnabled; +import static com.facebook.presto.hive.HiveSessionProperties.isOrderBasedExecutionEnabled; import static com.facebook.presto.hive.HiveSessionProperties.isParquetPushdownFilterEnabled; import static com.facebook.presto.hive.HiveSessionProperties.isPreferManifestsToListFiles; import static com.facebook.presto.hive.HiveSessionProperties.isRespectTableFormat; @@ -230,7 +231,6 @@ import static com.facebook.presto.hive.HiveSessionProperties.isSortedWriteToTempPathEnabled; import static com.facebook.presto.hive.HiveSessionProperties.isSortedWritingEnabled; import static com.facebook.presto.hive.HiveSessionProperties.isStatisticsEnabled; -import static com.facebook.presto.hive.HiveSessionProperties.isStreamingAggregationEnabled; import static com.facebook.presto.hive.HiveSessionProperties.isUsePageFileForHiveUnsupportedType; import static com.facebook.presto.hive.HiveSessionProperties.shouldCreateEmptyBucketFilesForTemporaryTable; import static com.facebook.presto.hive.HiveStorageFormat.AVRO; @@ -2819,36 +2819,33 @@ public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTa predicate = createPredicate(partitionColumns, partitions); } - // Expose ordering property of the table. - ImmutableList.Builder> localProperties = ImmutableList.builder(); + // Expose ordering property of the table when order based execution is enabled. + ImmutableList.Builder> localPropertyBuilder = ImmutableList.builder(); Optional> streamPartitionColumns = Optional.empty(); - if (table.getStorage().getBucketProperty().isPresent() && !table.getStorage().getBucketProperty().get().getSortedBy().isEmpty()) { + if (table.getStorage().getBucketProperty().isPresent() + && !table.getStorage().getBucketProperty().get().getSortedBy().isEmpty() + && isOrderBasedExecutionEnabled(session)) { ImmutableSet.Builder streamPartitionColumnsBuilder = ImmutableSet.builder(); + Map columnHandles = hiveColumnHandles(table).stream() + .collect(toImmutableMap(HiveColumnHandle::getName, identity())); // streamPartitioningColumns is how we partition the data across splits. // localProperty is how we partition the data within a split. - // 1. add partition columns to streamPartitionColumns - partitionColumns.forEach(streamPartitionColumnsBuilder::add); - // 2. add sorted columns to streamPartitionColumns and localProperties - HiveBucketProperty bucketProperty = table.getStorage().getBucketProperty().get(); - Map columnHandles = hiveColumnHandles(table).stream() - .collect(toImmutableMap(HiveColumnHandle::getName, identity())); - bucketProperty.getSortedBy().forEach(sortingColumn -> { - ColumnHandle columnHandle = columnHandles.get(sortingColumn.getColumnName()); - localProperties.add(new SortingProperty<>(columnHandle, sortingColumn.getOrder().getSortOrder())); + // 1. add partition columns and bucketed-by columns to streamPartitionColumns + // when order based execution is enabled, splitting is disabled and data is sharded across splits when table is bucketed. + partitionColumns.forEach(streamPartitionColumnsBuilder::add); + table.getStorage().getBucketProperty().get().getBucketedBy().forEach(bucketedByColumn -> { + ColumnHandle columnHandle = columnHandles.get(bucketedByColumn); streamPartitionColumnsBuilder.add(columnHandle); }); - // We currently only set streamPartitionColumns when it enables streaming aggregation and also it's eligible to enable streaming aggregation - // 1. When the bucket columns are the same as the prefix of the sort columns - // 2. When all rows of the same value group are guaranteed to be in the same split. We disable splitting a file when isStreamingAggregationEnabled is true to make sure the property is guaranteed. - List sortColumns = bucketProperty.getSortedBy().stream().map(SortingColumn::getColumnName).collect(toImmutableList()); - if (bucketProperty.getBucketedBy().size() <= sortColumns.size() - && bucketProperty.getBucketedBy().containsAll(sortColumns.subList(0, bucketProperty.getBucketedBy().size())) - && isStreamingAggregationEnabled(session)) { - streamPartitionColumns = Optional.of(streamPartitionColumnsBuilder.build()); - } + // 2. add sorted-by columns to localPropertyBuilder + table.getStorage().getBucketProperty().get().getSortedBy().forEach(sortingColumn -> { + ColumnHandle columnHandle = columnHandles.get(sortingColumn.getColumnName()); + localPropertyBuilder.add(new SortingProperty<>(columnHandle, sortingColumn.getOrder().getSortOrder())); + }); + streamPartitionColumns = Optional.of(streamPartitionColumnsBuilder.build()); } return new ConnectorTableLayout( @@ -2858,7 +2855,7 @@ && isStreamingAggregationEnabled(session)) { tablePartitioning, streamPartitionColumns, discretePredicates, - localProperties.build(), + localPropertyBuilder.build(), Optional.of(hiveLayoutHandle.getRemainingPredicate())); } diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java index 532be9072ca7d..b16d9497aec58 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java @@ -899,7 +899,7 @@ public static boolean isS3SelectPushdownEnabled(ConnectorSession session) return session.getProperty(S3_SELECT_PUSHDOWN_ENABLED, Boolean.class); } - public static boolean isStreamingAggregationEnabled(ConnectorSession session) + public static boolean isOrderBasedExecutionEnabled(ConnectorSession session) { return session.getProperty(ORDER_BASED_EXECUTION_ENABLED, Boolean.class); } diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/StoragePartitionLoader.java b/presto-hive/src/main/java/com/facebook/presto/hive/StoragePartitionLoader.java index 5657e88c4a724..7e0783f206858 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/StoragePartitionLoader.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/StoragePartitionLoader.java @@ -63,7 +63,7 @@ import static com.facebook.presto.hive.HiveSessionProperties.getMaxInitialSplitSize; import static com.facebook.presto.hive.HiveSessionProperties.getNodeSelectionStrategy; import static com.facebook.presto.hive.HiveSessionProperties.isFileSplittable; -import static com.facebook.presto.hive.HiveSessionProperties.isStreamingAggregationEnabled; +import static com.facebook.presto.hive.HiveSessionProperties.isOrderBasedExecutionEnabled; import static com.facebook.presto.hive.HiveSessionProperties.isUseListDirectoryCache; import static com.facebook.presto.hive.HiveUtil.getFooterCount; import static com.facebook.presto.hive.HiveUtil.getHeaderCount; @@ -267,7 +267,7 @@ public ListenableFuture loadPartition(HivePartitionMetadata partition, HiveSp // therefore we must not split files when either is enabled. // Skip header / footer lines are not splittable except for a special case when skip.header.line.count=1 boolean splittable = isFileSplittable(session) && - !isStreamingAggregationEnabled(session) && + !isOrderBasedExecutionEnabled(session) && !s3SelectPushdownEnabled && !partialAggregationsPushedDown && getFooterCount(schema) == 0 && getHeaderCount(schema) <= 1; diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestSegmentedAggregation.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestSegmentedAggregation.java new file mode 100644 index 0000000000000..7cbbbf89f4fd6 --- /dev/null +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestSegmentedAggregation.java @@ -0,0 +1,179 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.hive; + +import com.facebook.presto.Session; +import com.facebook.presto.testing.QueryRunner; +import com.facebook.presto.tests.AbstractTestQueryFramework; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.testng.annotations.Test; + +import java.util.Optional; + +import static com.facebook.presto.SystemSessionProperties.SEGMENTED_AGGREGATION_ENABLED; +import static com.facebook.presto.hive.HiveQueryRunner.HIVE_CATALOG; +import static com.facebook.presto.hive.HiveSessionProperties.ORDER_BASED_EXECUTION_ENABLED; +import static com.facebook.presto.spi.plan.AggregationNode.Step.SINGLE; +import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.aggregation; +import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.anyTree; +import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.functionCall; +import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.singleGroupingSet; +import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.tableScan; +import static io.airlift.tpch.TpchTable.CUSTOMER; +import static io.airlift.tpch.TpchTable.LINE_ITEM; +import static io.airlift.tpch.TpchTable.NATION; +import static io.airlift.tpch.TpchTable.ORDERS; + +public class TestSegmentedAggregation + extends AbstractTestQueryFramework +{ + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + return HiveQueryRunner.createQueryRunner( + ImmutableList.of(ORDERS, LINE_ITEM, CUSTOMER, NATION), + ImmutableMap.of("experimental.pushdown-subfields-enabled", "true"), + Optional.empty()); + } + + @Test + public void testSortedbyKeysPrefixNotASubsetOfGroupbyKeys() + { + QueryRunner queryRunner = getQueryRunner(); + + try { + queryRunner.execute("CREATE TABLE test_segmented_aggregation_customer0 WITH ( \n" + + " bucket_count = 4, bucketed_by = ARRAY['custkey'], \n" + + " sorted_by = ARRAY['name', 'custkey'], partitioned_by=array['ds'], \n" + + " format = 'DWRF' ) AS \n" + + "SELECT *, '2021-07-11' as ds FROM customer LIMIT 1000\n"); + + // can't enable segmented aggregation + assertPlan(orderBasedExecutionEnabled(), + "SELECT custkey, count(name) FROM test_segmented_aggregation_customer0 \n" + + "WHERE ds = '2021-07-11' GROUP BY 1", + anyTree(aggregation( + singleGroupingSet("custkey"), + ImmutableMap.of(Optional.of("count"), functionCall("count", ImmutableList.of("name"))), + ImmutableList.of(), // no segmented streaming + ImmutableMap.of(), + Optional.empty(), + SINGLE, + tableScan("test_segmented_aggregation_customer0", ImmutableMap.of("custkey", "custkey", "name", "name"))))); + } + finally { + queryRunner.execute("DROP TABLE IF EXISTS test_segmented_aggregation_customer0"); + } + } + + @Test + public void testAndSortedByKeysArePrefixOfGroupbyKeys() + { + QueryRunner queryRunner = getQueryRunner(); + + try { + queryRunner.execute("CREATE TABLE test_segmented_aggregation_customer WITH ( \n" + + " bucket_count = 4, bucketed_by = ARRAY['custkey', 'name'], \n" + + " sorted_by = ARRAY['custkey', 'name'], partitioned_by=array['ds'], \n" + + " format = 'DWRF' ) AS \n" + + "SELECT *, '2021-07-11' as ds FROM customer LIMIT 1000\n"); + + assertPlan( + orderBasedExecutionEnabled(), + "SELECT custkey, name, nationkey, COUNT(*) FROM test_segmented_aggregation_customer \n" + + "WHERE ds = '2021-07-11' GROUP BY 1, 2, 3", + anyTree(aggregation( + singleGroupingSet("custkey", "name", "nationkey"), + ImmutableMap.of(Optional.empty(), functionCall("count", ImmutableList.of())), + ImmutableList.of("custkey", "name"), // segmented streaming + ImmutableMap.of(), + Optional.empty(), + SINGLE, + tableScan("test_segmented_aggregation_customer", ImmutableMap.of("custkey", "custkey", "name", "name", "nationkey", "nationkey"))))); + } + finally { + queryRunner.execute("DROP TABLE IF EXISTS test_segmented_aggregation_customer"); + } + } + + @Test + public void testSortedByPrefixOfBucketedKeys() + { + QueryRunner queryRunner = getQueryRunner(); + + try { + queryRunner.execute("CREATE TABLE test_segmented_aggregation_customer2 WITH ( \n" + + " bucket_count = 4, bucketed_by = ARRAY['custkey', 'name'], \n" + + " sorted_by = ARRAY['custkey'], partitioned_by=array['ds'], \n" + + " format = 'DWRF' ) AS \n" + + "SELECT *, '2021-07-11' as ds FROM customer LIMIT 1000\n"); + + // can enable segmented aggregation + assertPlan(orderBasedExecutionEnabled(), + "SELECT name, custkey, COUNT(*) FROM test_segmented_aggregation_customer2 \n" + + "WHERE ds = '2021-07-11' GROUP BY 1, 2", + anyTree(aggregation( + singleGroupingSet("name", "custkey"), + ImmutableMap.of(Optional.empty(), functionCall("count", ImmutableList.of())), + ImmutableList.of("custkey"), // segmented aggregation + ImmutableMap.of(), + Optional.empty(), + SINGLE, + tableScan("test_segmented_aggregation_customer2", ImmutableMap.of("name", "name", "custkey", "custkey"))))); + } + finally { + queryRunner.execute("DROP TABLE IF EXISTS test_segmented_aggregation_customer2"); + } + } + + @Test + public void testGroupByKeysShareElementsAsSortedByKeysPrefix() + { + QueryRunner queryRunner = getQueryRunner(); + + try { + queryRunner.execute("CREATE TABLE test_segmented_aggregation_customer_share_elements WITH ( \n" + + " bucket_count = 4, bucketed_by = ARRAY['custkey', 'name', 'nationkey'], \n" + + " sorted_by = ARRAY['custkey', 'phone'], partitioned_by=array['ds'], \n" + + " format = 'DWRF' ) AS \n" + + "SELECT *, '2021-07-11' as ds FROM customer LIMIT 1000\n"); + + // can enable segmented aggregation + assertPlan(orderBasedExecutionEnabled(), + "SELECT name, custkey, nationkey, COUNT(*) FROM test_segmented_aggregation_customer_share_elements \n" + + "WHERE ds = '2021-07-11' GROUP BY 1, 2, 3", + anyTree(aggregation( + singleGroupingSet("name", "custkey", "nationkey"), + ImmutableMap.of(Optional.empty(), functionCall("count", ImmutableList.of())), + ImmutableList.of("custkey"), // segmented aggregation + ImmutableMap.of(), + Optional.empty(), + SINGLE, + tableScan("test_segmented_aggregation_customer_share_elements", ImmutableMap.of("name", "name", "custkey", "custkey", "nationkey", "nationkey"))))); + } + finally { + queryRunner.execute("DROP TABLE IF EXISTS test_segmented_aggregation_customer_share_elements"); + } + } + + private Session orderBasedExecutionEnabled() + { + return Session.builder(getQueryRunner().getDefaultSession()) + .setCatalogSessionProperty(HIVE_CATALOG, ORDER_BASED_EXECUTION_ENABLED, "true") + .setSystemProperty(SEGMENTED_AGGREGATION_ENABLED, "true") + .build(); + } +} diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestStreamingAggregationPlan.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestStreamingAggregationPlan.java index d9e7de1ecd131..dd9c4725c410a 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestStreamingAggregationPlan.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestStreamingAggregationPlan.java @@ -70,7 +70,7 @@ public void testUnsortedTable() // even with streaming aggregation enabled, non-ordered table that can't be applied streaming aggregation would use hash based aggregation assertPlan( - streamingAggregationEnabled(), + orderBasedExecutionEnabled(), "SELECT custkey, COUNT(*) FROM test_customer \n" + "WHERE ds = '2021-07-11' GROUP BY 1", aggregationPlanWithNoStreaming("test_customer", false, "custkey")); @@ -102,7 +102,7 @@ public void testBucketedAndSortedBySameKey() // streaming aggregation enabled assertPlan( - streamingAggregationEnabled(), + orderBasedExecutionEnabled(), "SELECT custkey, COUNT(*) FROM test_customer2 \n" + "WHERE ds = '2021-07-11' GROUP BY 1", node( @@ -136,15 +136,27 @@ public void testBucketedAndSortedByDifferentKeys() "SELECT *, '2021-07-11' as ds FROM customer LIMIT 1000\n"); // can't enable stream + // since it's bucketed by custkey, the local exchange would be removed when order based execution is enabled assertPlan( - streamingAggregationEnabled(), + orderBasedExecutionEnabled(), "SELECT custkey, COUNT(*) FROM test_customer3 \n" + "WHERE ds = '2021-07-11' GROUP BY 1", - aggregationPlanWithNoStreaming("test_customer3", false, "custkey")); + node( + OutputNode.class, + node( + ExchangeNode.class, + aggregation( + singleGroupingSet("custkey"), + ImmutableMap.of(Optional.empty(), functionCall("count", ImmutableList.of())), + ImmutableList.of(), // non-streaming + ImmutableMap.of(), + Optional.empty(), + SINGLE, + tableScan("test_customer3", ImmutableMap.of("custkey", "custkey")))))); // can't enable stream assertPlan( - streamingAggregationEnabled(), + orderBasedExecutionEnabled(), "SELECT name, COUNT(*) FROM test_customer3 \n" + "WHERE ds = '2021-07-11' GROUP BY 1", aggregationPlanWithNoStreaming("test_customer3", true, "name")); @@ -168,7 +180,7 @@ public void testBucketedByPrefixOfSortedKeys() // streaming aggregation enabled assertPlan( - streamingAggregationEnabled(), + orderBasedExecutionEnabled(), "SELECT custkey, name, COUNT(*) FROM test_customer4 \n" + "WHERE ds = '2021-07-11' GROUP BY 1, 2", node( @@ -202,7 +214,7 @@ public void testSortedByPrefixOfBucketedKeys() "SELECT *, '2021-07-11' as ds FROM customer LIMIT 1000\n"); // can't enable stream - assertPlan(streamingAggregationEnabled(), + assertPlan(orderBasedExecutionEnabled(), "SELECT custkey, COUNT(*) FROM test_customer5 \n" + "WHERE ds = '2021-07-11' GROUP BY 1", aggregationPlanWithNoStreaming("test_customer5", false, "custkey")); } @@ -225,7 +237,7 @@ public void testGroupbySameKeysOfSortedbyKeys() "SELECT *, '2021-07-11' as ds FROM customer LIMIT 1000\n"); // can enable streaming aggregation - assertPlan(streamingAggregationEnabled(), + assertPlan(orderBasedExecutionEnabled(), "SELECT custkey, name, COUNT(*) FROM test_customer6 \n" + "WHERE ds = '2021-07-11' GROUP BY 1, 2", node( @@ -247,34 +259,36 @@ public void testGroupbySameKeysOfSortedbyKeys() } @Test - public void testGroupbySupersetOfSortedKeys() + public void testGroupbySameKeysOfSortedbyKeysWithReverseOrder() { QueryRunner queryRunner = getQueryRunner(); try { - queryRunner.execute("CREATE TABLE test_customer7 WITH ( \n" + - " bucket_count = 4, bucketed_by = ARRAY['custkey'], \n" + - " sorted_by = ARRAY['custkey'], partitioned_by=array['ds'], \n" + + queryRunner.execute("CREATE TABLE test_customer6_2 WITH ( \n" + + " bucket_count = 4, bucketed_by = ARRAY['custkey', 'name'], \n" + + " sorted_by = ARRAY['custkey', 'name'], partitioned_by=array['ds'], \n" + " format = 'DWRF' ) AS \n" + "SELECT *, '2021-07-11' as ds FROM customer LIMIT 1000\n"); - // can't enable streaming aggregation, but streaming aggregation session property would disable splittable - assertPlan( - streamingAggregationEnabled(), - "SELECT custkey, name, COUNT(*) FROM test_customer7 \n" + + // can enable streaming aggregation + assertPlan(orderBasedExecutionEnabled(), + "SELECT name, custkey, COUNT(*) FROM test_customer6_2 \n" + "WHERE ds = '2021-07-11' GROUP BY 1, 2", - anyTree(aggregation( - singleGroupingSet("custkey", "name"), - // note: partial aggregation function has no parameter - ImmutableMap.of(Optional.empty(), functionCall("count", ImmutableList.of())), - ImmutableList.of(), // non-streaming - ImmutableMap.of(), - Optional.empty(), - SINGLE, - node(ProjectNode.class, tableScan("test_customer7", ImmutableMap.of("custkey", "custkey", "name", "name")))))); + node( + OutputNode.class, + node( + ExchangeNode.class, + aggregation( + singleGroupingSet("custkey", "name"), + ImmutableMap.of(Optional.empty(), functionCall("count", ImmutableList.of())), + ImmutableList.of("custkey", "name"), // streaming + ImmutableMap.of(), + Optional.empty(), + SINGLE, + tableScan("test_customer6_2", ImmutableMap.of("custkey", "custkey", "name", "name")))))); } finally { - queryRunner.execute("DROP TABLE IF EXISTS test_customer7"); + queryRunner.execute("DROP TABLE IF EXISTS test_customer6_2"); } } @@ -292,7 +306,7 @@ public void testGroupbyKeysNotPrefixOfSortedKeys() // can't enable streaming aggregation assertPlan( - streamingAggregationEnabled(), + orderBasedExecutionEnabled(), "SELECT name, COUNT(*) FROM test_customer8 \n" + "WHERE ds = '2021-07-11' GROUP BY 1", aggregationPlanWithNoStreaming("test_customer8", true, "name")); @@ -302,7 +316,40 @@ public void testGroupbyKeysNotPrefixOfSortedKeys() } } - //todo: add streaming aggregation support when grouping keys are prefix Of sorted keys + @Test + public void testGroupbyKeysPrefixOfSortedKeys() + { + QueryRunner queryRunner = getQueryRunner(); + + try { + queryRunner.execute("CREATE TABLE test_customer9 WITH ( \n" + + " bucket_count = 4, bucketed_by = ARRAY['custkey'], \n" + + " sorted_by = ARRAY['custkey', 'name'], partitioned_by=array['ds'], \n" + + " format = 'DWRF' ) AS \n" + + "SELECT *, '2021-07-11' as ds FROM customer LIMIT 1000\n"); + + // can enable streaming aggregation + assertPlan( + orderBasedExecutionEnabled(), + "SELECT custkey, COUNT(*) FROM test_customer9 \n" + + "WHERE ds = '2021-07-11' GROUP BY 1", + node( + OutputNode.class, + node( + ExchangeNode.class, + aggregation( + singleGroupingSet("custkey"), + ImmutableMap.of(Optional.empty(), functionCall("count", ImmutableList.of())), + ImmutableList.of("custkey"), // streaming + ImmutableMap.of(), + Optional.empty(), + SINGLE, + tableScan("test_customer9", ImmutableMap.of("custkey", "custkey")))))); + } + finally { + queryRunner.execute("DROP TABLE IF EXISTS test_customer9"); + } + } // Partition keys @Test @@ -311,29 +358,29 @@ public void testQueryingMultiplePartitions() QueryRunner queryRunner = getQueryRunner(); try { - queryRunner.execute("CREATE TABLE test_customer9 WITH ( \n" + + queryRunner.execute("CREATE TABLE test_customer10 WITH ( \n" + " bucket_count = 4, bucketed_by = ARRAY['custkey'], \n" + " sorted_by = ARRAY['custkey'], partitioned_by=array['ds'], \n" + " format = 'DWRF' ) AS \n" + "SELECT *, '2021-07-11' as ds FROM customer LIMIT 1000\n"); - queryRunner.execute("INSERT INTO test_customer9 \n" + + queryRunner.execute("INSERT INTO test_customer10 \n" + "SELECT *, '2021-07-12' as ds FROM tpch.sf1.customer LIMIT 1000"); // can't enable streaming aggregation when querying multiple partitions without grouping by partition keys assertPlan( - streamingAggregationEnabled(), - "SELECT custkey, COUNT(*) FROM test_customer9 \n" + + orderBasedExecutionEnabled(), + "SELECT custkey, COUNT(*) FROM test_customer10 \n" + "WHERE ds = '2021-07-11' or ds = '2021-07-12' GROUP BY 1", - aggregationPlanWithNoStreaming("test_customer9", false, "custkey")); + aggregationPlanWithNoStreaming("test_customer10", false, "custkey")); //todo: add streaming aggregation support when grouping keys contain all of the partition keys } finally { - queryRunner.execute("DROP TABLE IF EXISTS test_customer9"); + queryRunner.execute("DROP TABLE IF EXISTS test_customer10"); } } - private Session streamingAggregationEnabled() + private Session orderBasedExecutionEnabled() { return Session.builder(getQueryRunner().getDefaultSession()) .setCatalogSessionProperty(HIVE_CATALOG, ORDER_BASED_EXECUTION_ENABLED, "true") diff --git a/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java b/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java index d4efc14a4abcb..3283972342279 100644 --- a/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java +++ b/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java @@ -223,6 +223,7 @@ public final class SystemSessionProperties public static final String MAX_STAGE_COUNT_FOR_EAGER_SCHEDULING = "max_stage_count_for_eager_scheduling"; public static final String HYPERLOGLOG_STANDARD_ERROR_WARNING_THRESHOLD = "hyperloglog_standard_error_warning_threshold"; public static final String PREFER_MERGE_JOIN = "prefer_merge_join"; + public static final String SEGMENTED_AGGREGATION_ENABLED = "segmented_aggregation_enabled"; //TODO: Prestissimo related session properties that are temporarily put here. They will be relocated in the future public static final String PRESTISSIMO_SIMPLIFIED_EXPRESSION_EVALUATION_ENABLED = "simplified_expression_evaluation_enabled"; @@ -1190,6 +1191,11 @@ public SystemSessionProperties( "To make it work, the connector needs to guarantee and expose the data properties of the underlying table.", featuresConfig.isPreferMergeJoin(), true), + booleanProperty( + SEGMENTED_AGGREGATION_ENABLED, + "Enable segmented aggregation.", + featuresConfig.isSegmentedAggregationEnabled(), + true), new PropertyMetadata<>( AGGREGATION_IF_TO_FILTER_REWRITE_STRATEGY, format("Set the strategy used to rewrite AGG IF to AGG FILTER. Options are %s", @@ -2101,6 +2107,11 @@ public static boolean preferMergeJoin(Session session) return session.getSystemProperty(PREFER_MERGE_JOIN, Boolean.class); } + public static boolean isSegmentedAggregationEnabled(Session session) + { + return session.getSystemProperty(SEGMENTED_AGGREGATION_ENABLED, Boolean.class); + } + public static AggregationIfToFilterRewriteStrategy getAggregationIfToFilterRewriteStrategy(Session session) { return session.getSystemProperty(AGGREGATION_IF_TO_FILTER_REWRITE_STRATEGY, AggregationIfToFilterRewriteStrategy.class); diff --git a/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java b/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java index 271a43a692b07..5ee88785060ea 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java @@ -219,6 +219,7 @@ public class FeaturesConfig private boolean streamingForPartialAggregationEnabled; private boolean preferMergeJoin; + private boolean segmentedAggregationEnabled; private int maxStageCountForEagerScheduling = 25; private boolean quickDistinctLimitEnabled; @@ -2058,6 +2059,18 @@ public FeaturesConfig setPreferMergeJoin(boolean preferMergeJoin) return this; } + public boolean isSegmentedAggregationEnabled() + { + return segmentedAggregationEnabled; + } + + @Config("optimizer.segmented-aggregation-enabled") + public FeaturesConfig setSegmentedAggregationEnabled(boolean segmentedAggregationEnabled) + { + this.segmentedAggregationEnabled = segmentedAggregationEnabled; + return this; + } + public boolean isQuickDistinctLimitEnabled() { return quickDistinctLimitEnabled; diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PushPartialAggregationThroughJoin.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PushPartialAggregationThroughJoin.java index 74117542f1cf7..a86b39775791e 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PushPartialAggregationThroughJoin.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PushPartialAggregationThroughJoin.java @@ -56,8 +56,8 @@ public class PushPartialAggregationThroughJoin private static boolean isSupportedAggregationNode(AggregationNode aggregationNode) { - // Don't split streaming aggregations - if (aggregationNode.isStreamable()) { + // Don't split streaming aggregations or segmented aggregations + if (aggregationNode.isStreamable() || aggregationNode.isSegmentedAggregationEligible()) { return false; } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddLocalExchanges.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddLocalExchanges.java index 99afcf6a57a34..471ff9e010771 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddLocalExchanges.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddLocalExchanges.java @@ -71,6 +71,7 @@ import static com.facebook.presto.SystemSessionProperties.isEnforceFixedDistributionForOutputOperator; import static com.facebook.presto.SystemSessionProperties.isJoinSpillingEnabled; import static com.facebook.presto.SystemSessionProperties.isQuickDistinctLimitEnabled; +import static com.facebook.presto.SystemSessionProperties.isSegmentedAggregationEnabled; import static com.facebook.presto.SystemSessionProperties.isSpillEnabled; import static com.facebook.presto.SystemSessionProperties.isTableWriterMergeOperatorEnabled; import static com.facebook.presto.common.type.BigintType.BIGINT; @@ -331,10 +332,26 @@ public PlanWithProperties visitAggregation(AggregationNode node, StreamPreferred PlanWithProperties child = planAndEnforce(node.getSource(), childRequirements, childRequirements); List preGroupedSymbols = ImmutableList.of(); - if (!LocalProperties.match(child.getProperties().getLocalProperties(), LocalProperties.grouped(groupingKeys)).get(0).isPresent()) { + // Logic in LocalProperties.match(localProperties, groupingKeys) + // 1. Extract the longest prefix of localProperties to a set that is a subset of groupingKeys + // 2. Iterate grouped-by keys and add the elements that's not in the set to the result + // Result would be a List of one element: Optional, GroupingProperty would contain one/multiple elements from step 2 + // Eg: + // [A, B] [(B, A)] -> List.of(Optional.empty()) + // [A, B] [B] -> List.of(Optional.of(GroupingProperty(B))) + // [A, B] [A] -> List.of(Optional.empty()) + // [A, B] [(A, C)] -> List.of(Optional.of(GroupingProperty(C))) + // [A, B] [(D, A, C)] -> List.of(Optional.of(GroupingProperty(D, C))) + List>> matchResult = LocalProperties.match(child.getProperties().getLocalProperties(), LocalProperties.grouped(groupingKeys)); + if (!matchResult.get(0).isPresent()) { // !isPresent() indicates the property was satisfied completely preGroupedSymbols = groupingKeys; } + else if (matchResult.get(0).get().getColumns().size() < groupingKeys.size() && isSegmentedAggregationEnabled(session)) { + // If the result size = original groupingKeys size: all grouping keys are not pre-grouped, can't enable segmented aggregation + // Otherwise: partial grouping keys are pre-grouped, can enable segmented aggregation, the result represents the grouping keys that's not pre-grouped + preGroupedSymbols = groupingKeys.stream().filter(groupingKey -> !matchResult.get(0).get().getColumns().contains(groupingKey)).collect(toImmutableList()); + } AggregationNode result = new AggregationNode( node.getSourceLocation(), diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/HashGenerationOptimizer.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/HashGenerationOptimizer.java index 832d98299c263..a88e444fbca97 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/HashGenerationOptimizer.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/HashGenerationOptimizer.java @@ -166,7 +166,8 @@ public PlanWithProperties visitAggregation(AggregationNode node, HashComputation { Optional groupByHash = Optional.empty(); List groupingKeys = node.getGroupingKeys(); - if (!node.isStreamable() && !canSkipHashGeneration(node.getGroupingKeys())) { + if (!node.isStreamable() && !node.isSegmentedAggregationEligible() && !canSkipHashGeneration(node.getGroupingKeys())) { + // todo: for segmented aggregation, add optimizations for the fields that need to compute hash groupByHash = computeHash(groupingKeys, functionAndTypeManager); } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/planPrinter/PlanPrinter.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/planPrinter/PlanPrinter.java index abb9fdf5dd174..b48433ce7f984 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/planPrinter/PlanPrinter.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/planPrinter/PlanPrinter.java @@ -612,6 +612,9 @@ public Void visitAggregation(AggregationNode node, Void context) if (node.getStep() != AggregationNode.Step.SINGLE) { type = format("(%s)", node.getStep().toString()); } + if (node.isSegmentedAggregationEligible()) { + type = format("%s(SEGMENTED, %s)", type, node.getPreGroupedVariables()); + } if (node.isStreamable()) { type = format("%s(STREAMING)", type); } diff --git a/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java b/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java index 486ab2fa45912..97165af968f74 100644 --- a/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java +++ b/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java @@ -194,6 +194,7 @@ public void testDefaults() .setMaxStageCountForEagerScheduling(25) .setHyperloglogStandardErrorWarningThreshold(0.004) .setPreferMergeJoin(false) + .setSegmentedAggregationEnabled(false) .setQueryAnalyzerTimeout(new Duration(3, MINUTES)) .setQuickDistinctLimitEnabled(false)); } @@ -340,6 +341,7 @@ public void testExplicitPropertyMappings() .put("execution-policy.max-stage-count-for-eager-scheduling", "123") .put("hyperloglog-standard-error-warning-threshold", "0.02") .put("optimizer.prefer-merge-join", "true") + .put("optimizer.segmented-aggregation-enabled", "true") .put("planner.query-analyzer-timeout", "10s") .put("optimizer.quick-distinct-limit-enabled", "true") .build(); @@ -484,6 +486,7 @@ public void testExplicitPropertyMappings() .setMaxStageCountForEagerScheduling(123) .setHyperloglogStandardErrorWarningThreshold(0.02) .setPreferMergeJoin(true) + .setSegmentedAggregationEnabled(true) .setQueryAnalyzerTimeout(new Duration(10, SECONDS)) .setQuickDistinctLimitEnabled(true); assertFullMapping(properties, expected); diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/plan/AggregationNode.java b/presto-spi/src/main/java/com/facebook/presto/spi/plan/AggregationNode.java index 2849750866ceb..fbaedeb616d08 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/plan/AggregationNode.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/plan/AggregationNode.java @@ -210,7 +210,18 @@ public PlanNode replaceChildren(List newChildren) public boolean isStreamable() { - return !preGroupedVariables.isEmpty() && groupingSets.getGroupingSetCount() == 1 && groupingSets.getGlobalGroupingSets().isEmpty(); + return !preGroupedVariables.isEmpty() + && groupingSets.getGroupingSetCount() == 1 + && groupingSets.getGlobalGroupingSets().isEmpty() + && preGroupedVariables.size() == groupingSets.groupingKeys.size(); + } + + public boolean isSegmentedAggregationEligible() + { + return !preGroupedVariables.isEmpty() + && groupingSets.getGroupingSetCount() == 1 + && groupingSets.getGlobalGroupingSets().isEmpty() + && preGroupedVariables.size() < groupingSets.groupingKeys.size(); } @Override