From addc4c8391ea49f9a5967d6f589d9daf0eaa30d3 Mon Sep 17 00:00:00 2001 From: Ke Wang Date: Fri, 4 Feb 2022 16:06:27 -0800 Subject: [PATCH 1/3] Add ability to disable splitting file in hive connector --- .../com/facebook/presto/hive/HiveClientConfig.java | 14 ++++++++++++++ .../presto/hive/HiveSessionProperties.java | 11 +++++++++++ .../presto/hive/StoragePartitionLoader.java | 6 +++++- .../facebook/presto/hive/TestHiveClientConfig.java | 7 +++++-- 4 files changed, 35 insertions(+), 3 deletions(-) diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java index 9747543166992..e77d4420f1ed3 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java @@ -206,6 +206,7 @@ public class HiveClientConfig private boolean userDefinedTypeEncodingEnabled; private boolean columnIndexFilterEnabled; + private boolean fileSplittable = true; @Min(0) public int getMaxInitialSplits() @@ -1758,4 +1759,17 @@ public HiveClientConfig setUseRecordPageSourceForCustomSplit(boolean useRecordPa this.useRecordPageSourceForCustomSplit = useRecordPageSourceForCustomSplit; return this; } + + public boolean isFileSplittable() + { + return fileSplittable; + } + + @Config("hive.file-splittable") + @ConfigDescription("By default, this value is true. Set to false to make a hive file un-splittable when coordinator schedules splits.") + public HiveClientConfig setFileSplittable(boolean fileSplittable) + { + this.fileSplittable = fileSplittable; + return this; + } } diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java index 8e801035051c4..68ed2a2a25e94 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java @@ -135,6 +135,7 @@ public final class HiveSessionProperties public static final String MINIMUM_ASSIGNED_SPLIT_WEIGHT = "minimum_assigned_split_weight"; private static final String USE_RECORD_PAGE_SOURCE_FOR_CUSTOM_SPLIT = "use_record_page_source_for_custom_split"; public static final String MAX_INITIAL_SPLITS = "max_initial_splits"; + public static final String FILE_SPLITTABLE = "file_splittable"; private final List> sessionProperties; @@ -663,6 +664,11 @@ public HiveSessionProperties(HiveClientConfig hiveClientConfig, OrcFileWriterCon MAX_INITIAL_SPLITS, "Hive max initial split count", hiveClientConfig.getMaxInitialSplits(), + true), + booleanProperty( + FILE_SPLITTABLE, + "If a hive file is splittable when coordinator schedules splits", + hiveClientConfig.isFileSplittable(), true)); } @@ -1148,4 +1154,9 @@ public static int getHiveMaxInitialSplitSize(ConnectorSession session) { return session.getProperty(MAX_INITIAL_SPLITS, Integer.class); } + + public static boolean isFileSplittable(ConnectorSession session) + { + return session.getProperty(FILE_SPLITTABLE, Boolean.class); + } } diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/StoragePartitionLoader.java b/presto-hive/src/main/java/com/facebook/presto/hive/StoragePartitionLoader.java index 5ebceacfcdbbc..85e0ff910d223 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/StoragePartitionLoader.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/StoragePartitionLoader.java @@ -68,6 +68,7 @@ import static com.facebook.presto.hive.HiveMetadata.shouldCreateFilesForMissingBuckets; import static com.facebook.presto.hive.HiveSessionProperties.getMaxInitialSplitSize; import static com.facebook.presto.hive.HiveSessionProperties.getNodeSelectionStrategy; +import static com.facebook.presto.hive.HiveSessionProperties.isFileSplittable; import static com.facebook.presto.hive.HiveSessionProperties.isUseListDirectoryCache; import static com.facebook.presto.hive.HiveUtil.getFooterCount; import static com.facebook.presto.hive.HiveUtil.getHeaderCount; @@ -264,7 +265,10 @@ public ListenableFuture loadPartition(HivePartitionMetadata partition, HiveSp // Partial aggregation pushdown works at the granularity of individual files // therefore we must not split files when either is enabled. // Skip header / footer lines are not splittable except for a special case when skip.header.line.count=1 - boolean splittable = !s3SelectPushdownEnabled && !partialAggregationsPushedDown && getFooterCount(schema) == 0 && getHeaderCount(schema) <= 1; + boolean splittable = isFileSplittable(session) && + !s3SelectPushdownEnabled && + !partialAggregationsPushedDown && + getFooterCount(schema) == 0 && getHeaderCount(schema) <= 1; // Bucketed partitions are fully loaded immediately since all files must be loaded to determine the file to bucket mapping if (tableBucketInfo.isPresent()) { diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java index a33ed612c67d6..65372bc113f4e 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java @@ -161,7 +161,8 @@ public void testDefaults() .setSizeBasedSplitWeightsEnabled(true) .setMinimumAssignedSplitWeight(0.05) .setUserDefinedTypeEncodingEnabled(false) - .setUseRecordPageSourceForCustomSplit(true)); + .setUseRecordPageSourceForCustomSplit(true) + .setFileSplittable(true)); } @Test @@ -284,6 +285,7 @@ public void testExplicitPropertyMappings() .put("hive.user-defined-type-encoding-enabled", "true") .put("hive.minimum-assigned-split-weight", "1.0") .put("hive.use-record-page-source-for-custom-split", "false") + .put("hive.file-splittable", "false") .build(); HiveClientConfig expected = new HiveClientConfig() @@ -402,7 +404,8 @@ public void testExplicitPropertyMappings() .setSizeBasedSplitWeightsEnabled(false) .setMinimumAssignedSplitWeight(1.0) .setUserDefinedTypeEncodingEnabled(true) - .setUseRecordPageSourceForCustomSplit(false); + .setUseRecordPageSourceForCustomSplit(false) + .setFileSplittable(false); ConfigAssertions.assertFullMapping(properties, expected); } From a86ec945fc037da787636c2679e8d06b0d4c1d3e Mon Sep 17 00:00:00 2001 From: Ke Wang Date: Tue, 8 Feb 2022 11:29:25 -0800 Subject: [PATCH 2/3] Add ability to do streaming aggregation for hive table scans As of now, we only support streaming aggregation for the cases where group-by keys are the same as order-by keys, cases where group-by keys are a subset of order-by keys are not supported for now. Co-Authored-By: Zhan Yuan --- .../presto/hive/HiveClientConfig.java | 14 + .../facebook/presto/hive/HiveMetadata.java | 39 +- .../presto/hive/HiveSessionProperties.java | 11 + .../presto/hive/StoragePartitionLoader.java | 3 + .../presto/hive/TestHiveClientConfig.java | 3 + .../hive/TestStreamingAggregationPlan.java | 371 ++++++++++++++++++ 6 files changed, 439 insertions(+), 2 deletions(-) create mode 100644 presto-hive/src/test/java/com/facebook/presto/hive/TestStreamingAggregationPlan.java diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java index e77d4420f1ed3..28b6abd0fdc52 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java @@ -155,6 +155,7 @@ public class HiveClientConfig private boolean s3SelectPushdownEnabled; private int s3SelectPushdownMaxConnections = 500; + private boolean streamingAggregationEnabled; private boolean isTemporaryStagingDirectoryEnabled = true; private String temporaryStagingDirectoryPath = "/tmp/presto-${USER}"; @@ -1356,6 +1357,19 @@ public HiveClientConfig setS3SelectPushdownMaxConnections(int s3SelectPushdownMa return this; } + public boolean isStreamingAggregationEnabled() + { + return streamingAggregationEnabled; + } + + @Config("hive.streaming-aggregation-enabled") + @ConfigDescription("Enable streaming aggregation execution") + public HiveClientConfig setStreamingAggregationEnabled(boolean streamingAggregationEnabled) + { + this.streamingAggregationEnabled = streamingAggregationEnabled; + return this; + } + public boolean isTemporaryStagingDirectoryEnabled() { return isTemporaryStagingDirectoryEnabled; diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java index 47c53c74e5949..8ae0c170a748a 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java @@ -63,6 +63,7 @@ import com.facebook.presto.spi.Constraint; import com.facebook.presto.spi.DiscretePredicates; import com.facebook.presto.spi.InMemoryRecordSet; +import com.facebook.presto.spi.LocalProperty; import com.facebook.presto.spi.MaterializedViewNotFoundException; import com.facebook.presto.spi.MaterializedViewStatus; import com.facebook.presto.spi.PrestoException; @@ -70,6 +71,7 @@ import com.facebook.presto.spi.RecordCursor; import com.facebook.presto.spi.SchemaTableName; import com.facebook.presto.spi.SchemaTablePrefix; +import com.facebook.presto.spi.SortingProperty; import com.facebook.presto.spi.SystemTable; import com.facebook.presto.spi.TableLayoutFilterCoverage; import com.facebook.presto.spi.TableNotFoundException; @@ -222,6 +224,7 @@ import static com.facebook.presto.hive.HiveSessionProperties.isSortedWriteToTempPathEnabled; import static com.facebook.presto.hive.HiveSessionProperties.isSortedWritingEnabled; import static com.facebook.presto.hive.HiveSessionProperties.isStatisticsEnabled; +import static com.facebook.presto.hive.HiveSessionProperties.isStreamingAggregationEnabled; import static com.facebook.presto.hive.HiveSessionProperties.isUsePageFileForHiveUnsupportedType; import static com.facebook.presto.hive.HiveSessionProperties.shouldCreateEmptyBucketFilesForTemporaryTable; import static com.facebook.presto.hive.HiveStorageFormat.AVRO; @@ -2758,14 +2761,46 @@ public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTa predicate = createPredicate(partitionColumns, partitions); } + // Expose ordering property of the table. + ImmutableList.Builder> localProperties = ImmutableList.builder(); + Optional> streamPartitionColumns = Optional.empty(); + if (table.getStorage().getBucketProperty().isPresent() && !table.getStorage().getBucketProperty().get().getSortedBy().isEmpty()) { + ImmutableSet.Builder streamPartitionColumnsBuilder = ImmutableSet.builder(); + + // streamPartitioningColumns is how we partition the data across splits. + // localProperty is how we partition the data within a split. + // 1. add partition columns to streamPartitionColumns + partitionColumns.forEach(streamPartitionColumnsBuilder::add); + + // 2. add sorted columns to streamPartitionColumns and localProperties + HiveBucketProperty bucketProperty = table.getStorage().getBucketProperty().get(); + Map columnHandles = hiveColumnHandles(table).stream() + .collect(toImmutableMap(HiveColumnHandle::getName, identity())); + bucketProperty.getSortedBy().forEach(sortingColumn -> { + ColumnHandle columnHandle = columnHandles.get(sortingColumn.getColumnName()); + localProperties.add(new SortingProperty<>(columnHandle, sortingColumn.getOrder().getSortOrder())); + streamPartitionColumnsBuilder.add(columnHandle); + }); + + // We currently only set streamPartitionColumns when it enables streaming aggregation and also it's eligible to enable streaming aggregation + // 1. When the bucket columns are the same as the prefix of the sort columns + // 2. When all rows of the same value group are guaranteed to be in the same split. We disable splitting a file when isStreamingAggregationEnabled is true to make sure the property is guaranteed. + List sortColumns = bucketProperty.getSortedBy().stream().map(SortingColumn::getColumnName).collect(toImmutableList()); + if (bucketProperty.getBucketedBy().size() <= sortColumns.size() + && bucketProperty.getBucketedBy().containsAll(sortColumns.subList(0, bucketProperty.getBucketedBy().size())) + && isStreamingAggregationEnabled(session)) { + streamPartitionColumns = Optional.of(streamPartitionColumnsBuilder.build()); + } + } + return new ConnectorTableLayout( hiveLayoutHandle, Optional.empty(), predicate, tablePartitioning, - Optional.empty(), + streamPartitionColumns, discretePredicates, - ImmutableList.of(), + localProperties.build(), Optional.of(hiveLayoutHandle.getRemainingPredicate())); } diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java index 68ed2a2a25e94..2ef432311d66b 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java @@ -94,6 +94,7 @@ public final class HiveSessionProperties public static final String PARTITION_STATISTICS_BASED_OPTIMIZATION_ENABLED = "partition_stats_based_optimization_enabled"; private static final String OPTIMIZE_MISMATCHED_BUCKET_COUNT = "optimize_mismatched_bucket_count"; private static final String S3_SELECT_PUSHDOWN_ENABLED = "s3_select_pushdown_enabled"; + public static final String STREAMING_AGGREGATION_ENABLED = "streaming_aggregation_enabled"; public static final String SHUFFLE_PARTITIONED_COLUMNS_FOR_TABLE_WRITE = "shuffle_partitioned_columns_for_table_write"; public static final String TEMPORARY_STAGING_DIRECTORY_ENABLED = "temporary_staging_directory_enabled"; private static final String TEMPORARY_STAGING_DIRECTORY_PATH = "temporary_staging_directory_path"; @@ -414,6 +415,11 @@ public HiveSessionProperties(HiveClientConfig hiveClientConfig, OrcFileWriterCon "S3 Select pushdown enabled", hiveClientConfig.isS3SelectPushdownEnabled(), false), + booleanProperty( + STREAMING_AGGREGATION_ENABLED, + "Enable streaming aggregation execution", + hiveClientConfig.isStreamingAggregationEnabled(), + false), booleanProperty( TEMPORARY_STAGING_DIRECTORY_ENABLED, "Should use temporary staging directory for write operations", @@ -889,6 +895,11 @@ public static boolean isS3SelectPushdownEnabled(ConnectorSession session) return session.getProperty(S3_SELECT_PUSHDOWN_ENABLED, Boolean.class); } + public static boolean isStreamingAggregationEnabled(ConnectorSession session) + { + return session.getProperty(STREAMING_AGGREGATION_ENABLED, Boolean.class); + } + public static boolean isStatisticsEnabled(ConnectorSession session) { return session.getProperty(STATISTICS_ENABLED, Boolean.class); diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/StoragePartitionLoader.java b/presto-hive/src/main/java/com/facebook/presto/hive/StoragePartitionLoader.java index 85e0ff910d223..f7a0583dc4d53 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/StoragePartitionLoader.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/StoragePartitionLoader.java @@ -69,6 +69,7 @@ import static com.facebook.presto.hive.HiveSessionProperties.getMaxInitialSplitSize; import static com.facebook.presto.hive.HiveSessionProperties.getNodeSelectionStrategy; import static com.facebook.presto.hive.HiveSessionProperties.isFileSplittable; +import static com.facebook.presto.hive.HiveSessionProperties.isStreamingAggregationEnabled; import static com.facebook.presto.hive.HiveSessionProperties.isUseListDirectoryCache; import static com.facebook.presto.hive.HiveUtil.getFooterCount; import static com.facebook.presto.hive.HiveUtil.getHeaderCount; @@ -261,11 +262,13 @@ public ListenableFuture loadPartition(HivePartitionMetadata partition, HiveSp return addSplitsToSource(splits, splitFactory, hiveSplitSource, stopped); } PathFilter pathFilter = isHudiParquetInputFormat(inputFormat) ? hoodiePathFilterLoadingCache.getUnchecked(configuration) : path1 -> true; + // Streaming aggregation works at the granularity of individual files // S3 Select pushdown works at the granularity of individual S3 objects, // Partial aggregation pushdown works at the granularity of individual files // therefore we must not split files when either is enabled. // Skip header / footer lines are not splittable except for a special case when skip.header.line.count=1 boolean splittable = isFileSplittable(session) && + !isStreamingAggregationEnabled(session) && !s3SelectPushdownEnabled && !partialAggregationsPushedDown && getFooterCount(schema) == 0 && getHeaderCount(schema) <= 1; diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java index 65372bc113f4e..69f3e2b48bfa6 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java @@ -124,6 +124,7 @@ public void testDefaults() .setPartitionStatisticsBasedOptimizationEnabled(false) .setS3SelectPushdownEnabled(false) .setS3SelectPushdownMaxConnections(500) + .setStreamingAggregationEnabled(false) .setTemporaryStagingDirectoryEnabled(true) .setTemporaryStagingDirectoryPath("/tmp/presto-${USER}") .setTemporaryTableSchema("default") @@ -247,6 +248,7 @@ public void testExplicitPropertyMappings() .put("hive.partition-statistics-based-optimization-enabled", "true") .put("hive.s3select-pushdown.enabled", "true") .put("hive.s3select-pushdown.max-connections", "1234") + .put("hive.streaming-aggregation-enabled", "true") .put("hive.temporary-staging-directory-enabled", "false") .put("hive.temporary-staging-directory-path", "updated") .put("hive.temporary-table-schema", "other") @@ -367,6 +369,7 @@ public void testExplicitPropertyMappings() .setPartitionStatisticsBasedOptimizationEnabled(true) .setS3SelectPushdownEnabled(true) .setS3SelectPushdownMaxConnections(1234) + .setStreamingAggregationEnabled(true) .setTemporaryStagingDirectoryEnabled(false) .setTemporaryStagingDirectoryPath("updated") .setTemporaryTableSchema("other") diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestStreamingAggregationPlan.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestStreamingAggregationPlan.java new file mode 100644 index 0000000000000..5deeb894d5752 --- /dev/null +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestStreamingAggregationPlan.java @@ -0,0 +1,371 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.hive; + +import com.facebook.presto.Session; +import com.facebook.presto.spi.plan.ProjectNode; +import com.facebook.presto.sql.planner.assertions.PlanMatchPattern; +import com.facebook.presto.sql.planner.plan.ExchangeNode; +import com.facebook.presto.sql.planner.plan.OutputNode; +import com.facebook.presto.testing.QueryRunner; +import com.facebook.presto.tests.AbstractTestQueryFramework; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.testng.annotations.Test; + +import java.util.Optional; + +import static com.facebook.presto.hive.HiveQueryRunner.HIVE_CATALOG; +import static com.facebook.presto.hive.HiveSessionProperties.STREAMING_AGGREGATION_ENABLED; +import static com.facebook.presto.spi.plan.AggregationNode.Step.FINAL; +import static com.facebook.presto.spi.plan.AggregationNode.Step.PARTIAL; +import static com.facebook.presto.spi.plan.AggregationNode.Step.SINGLE; +import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.aggregation; +import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.anySymbol; +import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.anyTree; +import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.functionCall; +import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.node; +import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.singleGroupingSet; +import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.tableScan; +import static io.airlift.tpch.TpchTable.CUSTOMER; +import static io.airlift.tpch.TpchTable.LINE_ITEM; +import static io.airlift.tpch.TpchTable.NATION; +import static io.airlift.tpch.TpchTable.ORDERS; + +public class TestStreamingAggregationPlan + extends AbstractTestQueryFramework +{ + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + return HiveQueryRunner.createQueryRunner( + ImmutableList.of(ORDERS, LINE_ITEM, CUSTOMER, NATION), + ImmutableMap.of("experimental.pushdown-subfields-enabled", "true"), + Optional.empty()); + } + + @Test + public void testUnsortedTable() + { + QueryRunner queryRunner = getQueryRunner(); + + try { + queryRunner.execute("CREATE TABLE test_customer WITH ( \n" + + " bucket_count = 4, bucketed_by = ARRAY['custkey'], \n" + + " partitioned_by=array['ds'], \n" + + " format = 'DWRF' ) AS \n" + + "SELECT *, '2021-07-11' as ds FROM customer LIMIT 1000\n"); + + // even with streaming aggregation enabled, non-ordered table that can't be applied streaming aggregation would use hash based aggregation + assertPlan( + streamingAggregationEnabled(), + "SELECT custkey, COUNT(*) FROM test_customer \n" + + "WHERE ds = '2021-07-11' GROUP BY 1", + aggregationPlanWithNoStreaming("test_customer", false, "custkey")); + } + finally { + queryRunner.execute("DROP TABLE IF EXISTS test_customer"); + } + } + + // bucket-keys and sorted keys + @Test + public void testBucketedAndSortedBySameKey() + { + QueryRunner queryRunner = getQueryRunner(); + + try { + queryRunner.execute("CREATE TABLE test_customer2 WITH ( \n" + + " bucket_count = 4, bucketed_by = ARRAY['custkey'], \n" + + " sorted_by = ARRAY['custkey'], partitioned_by=array['ds'], \n" + + " format = 'DWRF' ) AS \n" + + "SELECT *, '2021-07-11' as ds FROM customer LIMIT 1000\n"); + + queryRunner.execute("INSERT INTO test_customer2 \n" + + "SELECT *, '2021-07-12' as ds FROM tpch.sf1.customer LIMIT 1000"); + + // default: streaming aggregation is not turned on by default and hash based aggregation would be used + assertPlan("SELECT custkey, COUNT(*) FROM test_customer2 \n" + + "WHERE ds = '2021-07-11' GROUP BY 1", aggregationPlanWithNoStreaming("test_customer2", false, "custkey")); + + // streaming aggregation enabled + assertPlan( + streamingAggregationEnabled(), + "SELECT custkey, COUNT(*) FROM test_customer2 \n" + + "WHERE ds = '2021-07-11' GROUP BY 1", + node( + OutputNode.class, + node( + ExchangeNode.class, + aggregation( + singleGroupingSet("custkey"), + ImmutableMap.of(Optional.empty(), functionCall("count", ImmutableList.of())), + ImmutableList.of("custkey"), // streaming + ImmutableMap.of(), + Optional.empty(), + SINGLE, + tableScan("test_customer2", ImmutableMap.of("custkey", "custkey")))))); + } + finally { + queryRunner.execute("DROP TABLE IF EXISTS test_customer2"); + } + } + + @Test + public void testBucketedAndSortedByDifferentKeys() + { + QueryRunner queryRunner = getQueryRunner(); + + try { + queryRunner.execute("CREATE TABLE test_customer3 WITH ( \n" + + " bucket_count = 4, bucketed_by = ARRAY['custkey'], \n" + + " sorted_by = ARRAY['name'], partitioned_by=array['ds'], \n" + + " format = 'DWRF' ) AS \n" + + "SELECT *, '2021-07-11' as ds FROM customer LIMIT 1000\n"); + + // can't enable stream + assertPlan( + streamingAggregationEnabled(), + "SELECT custkey, COUNT(*) FROM test_customer3 \n" + + "WHERE ds = '2021-07-11' GROUP BY 1", + aggregationPlanWithNoStreaming("test_customer3", false, "custkey")); + + // can't enable stream + assertPlan( + streamingAggregationEnabled(), + "SELECT name, COUNT(*) FROM test_customer3 \n" + + "WHERE ds = '2021-07-11' GROUP BY 1", + aggregationPlanWithNoStreaming("test_customer3", true, "name")); + } + finally { + queryRunner.execute("DROP TABLE IF EXISTS test_customer3"); + } + } + + @Test + public void testBucketedByPrefixOfSortedKeys() + { + QueryRunner queryRunner = getQueryRunner(); + + try { + queryRunner.execute("CREATE TABLE test_customer4 WITH ( \n" + + " bucket_count = 4, bucketed_by = ARRAY['custkey'], \n" + + " sorted_by = ARRAY['custkey', 'name'], partitioned_by=array['ds'], \n" + + " format = 'DWRF' ) AS \n" + + "SELECT *, '2021-07-11' as ds FROM customer LIMIT 1000\n"); + + // streaming aggregation enabled + assertPlan( + streamingAggregationEnabled(), + "SELECT custkey, name, COUNT(*) FROM test_customer4 \n" + + "WHERE ds = '2021-07-11' GROUP BY 1, 2", + node( + OutputNode.class, + node( + ExchangeNode.class, + aggregation( + singleGroupingSet("custkey", "name"), + ImmutableMap.of(Optional.empty(), functionCall("count", ImmutableList.of())), + ImmutableList.of("custkey", "name"), // streaming + ImmutableMap.of(), + Optional.empty(), + SINGLE, + tableScan("test_customer4", ImmutableMap.of("custkey", "custkey", "name", "name")))))); + } + finally { + queryRunner.execute("DROP TABLE IF EXISTS test_customer4"); + } + } + + @Test + public void testSortedByPrefixOfBucketedKeys() + { + QueryRunner queryRunner = getQueryRunner(); + + try { + queryRunner.execute("CREATE TABLE test_customer5 WITH ( \n" + + " bucket_count = 4, bucketed_by = ARRAY['custkey', 'name'], \n" + + " sorted_by = ARRAY['custkey'], partitioned_by=array['ds'], \n" + + " format = 'DWRF' ) AS \n" + + "SELECT *, '2021-07-11' as ds FROM customer LIMIT 1000\n"); + + // can't enable stream + assertPlan(streamingAggregationEnabled(), + "SELECT custkey, COUNT(*) FROM test_customer5 \n" + + "WHERE ds = '2021-07-11' GROUP BY 1", aggregationPlanWithNoStreaming("test_customer5", false, "custkey")); + } + finally { + queryRunner.execute("DROP TABLE IF EXISTS test_customer5"); + } + } + + // Sorted keys and groupby keys + @Test + public void testGroupbySameKeysOfSortedbyKeys() + { + QueryRunner queryRunner = getQueryRunner(); + + try { + queryRunner.execute("CREATE TABLE test_customer6 WITH ( \n" + + " bucket_count = 4, bucketed_by = ARRAY['custkey', 'name'], \n" + + " sorted_by = ARRAY['custkey', 'name'], partitioned_by=array['ds'], \n" + + " format = 'DWRF' ) AS \n" + + "SELECT *, '2021-07-11' as ds FROM customer LIMIT 1000\n"); + + // can enable streaming aggregation + assertPlan(streamingAggregationEnabled(), + "SELECT custkey, name, COUNT(*) FROM test_customer6 \n" + + "WHERE ds = '2021-07-11' GROUP BY 1, 2", + node( + OutputNode.class, + node( + ExchangeNode.class, + aggregation( + singleGroupingSet("custkey", "name"), + ImmutableMap.of(Optional.empty(), functionCall("count", ImmutableList.of())), + ImmutableList.of("custkey", "name"), // streaming + ImmutableMap.of(), + Optional.empty(), + SINGLE, + tableScan("test_customer6", ImmutableMap.of("custkey", "custkey", "name", "name")))))); + } + finally { + queryRunner.execute("DROP TABLE IF EXISTS test_customer6"); + } + } + + @Test + public void testGroupbySupersetOfSortedKeys() + { + QueryRunner queryRunner = getQueryRunner(); + + try { + queryRunner.execute("CREATE TABLE test_customer7 WITH ( \n" + + " bucket_count = 4, bucketed_by = ARRAY['custkey'], \n" + + " sorted_by = ARRAY['custkey'], partitioned_by=array['ds'], \n" + + " format = 'DWRF' ) AS \n" + + "SELECT *, '2021-07-11' as ds FROM customer LIMIT 1000\n"); + + // can't enable streaming aggregation, but streaming aggregation session property would disable splittable + assertPlan( + streamingAggregationEnabled(), + "SELECT custkey, name, COUNT(*) FROM test_customer7 \n" + + "WHERE ds = '2021-07-11' GROUP BY 1, 2", + anyTree(aggregation( + singleGroupingSet("custkey", "name"), + // note: partial aggregation function has no parameter + ImmutableMap.of(Optional.empty(), functionCall("count", ImmutableList.of())), + ImmutableList.of(), // non-streaming + ImmutableMap.of(), + Optional.empty(), + SINGLE, + node(ProjectNode.class, tableScan("test_customer7", ImmutableMap.of("custkey", "custkey", "name", "name")))))); + } + finally { + queryRunner.execute("DROP TABLE IF EXISTS test_customer7"); + } + } + + @Test + public void testGroupbyKeysNotPrefixOfSortedKeys() + { + QueryRunner queryRunner = getQueryRunner(); + + try { + queryRunner.execute("CREATE TABLE test_customer8 WITH ( \n" + + " bucket_count = 4, bucketed_by = ARRAY['custkey', 'name'], \n" + + " sorted_by = ARRAY['custkey', 'name'], partitioned_by=array['ds'], \n" + + " format = 'DWRF' ) AS \n" + + "SELECT *, '2021-07-11' as ds FROM customer LIMIT 1000\n"); + + // can't enable streaming aggregation + assertPlan( + streamingAggregationEnabled(), + "SELECT name, COUNT(*) FROM test_customer8 \n" + + "WHERE ds = '2021-07-11' GROUP BY 1", + aggregationPlanWithNoStreaming("test_customer8", true, "name")); + } + finally { + queryRunner.execute("DROP TABLE IF EXISTS test_customer8"); + } + } + + //todo: add streaming aggregation support when grouping keys are prefix Of sorted keys + + // Partition keys + @Test + public void testQueryingMultiplePartitions() + { + QueryRunner queryRunner = getQueryRunner(); + + try { + queryRunner.execute("CREATE TABLE test_customer9 WITH ( \n" + + " bucket_count = 4, bucketed_by = ARRAY['custkey'], \n" + + " sorted_by = ARRAY['custkey'], partitioned_by=array['ds'], \n" + + " format = 'DWRF' ) AS \n" + + "SELECT *, '2021-07-11' as ds FROM customer LIMIT 1000\n"); + queryRunner.execute("INSERT INTO test_customer9 \n" + + "SELECT *, '2021-07-12' as ds FROM tpch.sf1.customer LIMIT 1000"); + + // can't enable streaming aggregation when querying multiple partitions without grouping by partition keys + assertPlan( + streamingAggregationEnabled(), + "SELECT custkey, COUNT(*) FROM test_customer9 \n" + + "WHERE ds = '2021-07-11' or ds = '2021-07-12' GROUP BY 1", + aggregationPlanWithNoStreaming("test_customer9", false, "custkey")); + + //todo: add streaming aggregation support when grouping keys contain all of the partition keys + } + finally { + queryRunner.execute("DROP TABLE IF EXISTS test_customer9"); + } + } + + private Session streamingAggregationEnabled() + { + return Session.builder(getQueryRunner().getDefaultSession()) + .setCatalogSessionProperty(HIVE_CATALOG, STREAMING_AGGREGATION_ENABLED, "true") + .build(); + } + + private PlanMatchPattern aggregationPlanWithNoStreaming(String tableName, boolean hasProject, String... groupingKeys) + { + ImmutableMap.Builder columnReferencesBuilder = ImmutableMap.builder(); + for (String groupingKey : groupingKeys) { + columnReferencesBuilder.put(groupingKey, groupingKey); + } + PlanMatchPattern tableScanPattern = tableScan(tableName, columnReferencesBuilder.build()); + + return anyTree(aggregation( + singleGroupingSet(groupingKeys), + // note: final aggregation function's parameter is of size one + ImmutableMap.of(Optional.empty(), functionCall("count", false, ImmutableList.of(anySymbol()))), + ImmutableList.of(), // non-streaming + ImmutableMap.of(), + Optional.empty(), + FINAL, + node( + ExchangeNode.class, + anyTree(aggregation( + singleGroupingSet(groupingKeys), + // note: partial aggregation function has no parameter + ImmutableMap.of(Optional.empty(), functionCall("count", ImmutableList.of())), + ImmutableList.of(), // non-streaming + ImmutableMap.of(), + Optional.empty(), + PARTIAL, + hasProject ? node(ProjectNode.class, tableScanPattern) : tableScanPattern))))); + } +} From 7a0ac108b9e8c5e42f15df49ceb279b79cf8c936 Mon Sep 17 00:00:00 2001 From: Ke Wang Date: Wed, 9 Feb 2022 12:38:35 -0800 Subject: [PATCH 3/3] Add ability to do streaming for partial aggregation We can always enable streaming aggregation for partial aggregations without affecting correctness. But if the data isn't clustered (for example: ordered) by the group-by keys, it may cause regressions on latency and resource usage. This session property is just a solution to force enabling streaming aggregation when we know the execution would benefit from partial streaming aggregation. We can work on determining it based on the input table properties later. --- .../presto/SystemSessionProperties.java | 11 ++++ .../presto/sql/analyzer/FeaturesConfig.java | 14 +++++ ...PushPartialAggregationThroughExchange.java | 11 +++- .../planner/optimizations/SymbolMapper.java | 2 +- .../sql/analyzer/TestFeaturesConfig.java | 7 ++- .../TestStreamingForPartialAggregation.java | 52 +++++++++++++++++++ 6 files changed, 93 insertions(+), 4 deletions(-) create mode 100644 presto-main/src/test/java/com/facebook/presto/sql/planner/TestStreamingForPartialAggregation.java diff --git a/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java b/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java index 9e57d9ea9a877..683eab46defe1 100644 --- a/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java +++ b/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java @@ -217,6 +217,7 @@ public final class SystemSessionProperties public static final String EXCEEDED_MEMORY_LIMIT_HEAP_DUMP_FILE_DIRECTORY = "exceeded_memory_limit_heap_dump_file_directory"; public static final String DISTRIBUTED_TRACING_MODE = "distributed_tracing_mode"; public static final String VERBOSE_RUNTIME_STATS_ENABLED = "verbose_runtime_stats_enabled"; + public static final String STREAMING_FOR_PARTIAL_AGGREGATION_ENABLED = "streaming_for_partial_aggregation_enabled"; //TODO: Prestissimo related session properties that are temporarily put here. They will be relocated in the future public static final String PRESTISSIMO_SIMPLIFIED_EXPRESSION_EVALUATION_ENABLED = "simplified_expression_evaluation_enabled"; @@ -1158,6 +1159,11 @@ public SystemSessionProperties( "Enable logging all runtime stats", featuresConfig.isVerboseRuntimeStatsEnabled(), false), + booleanProperty( + STREAMING_FOR_PARTIAL_AGGREGATION_ENABLED, + "Enable streaming for partial aggregation", + featuresConfig.isStreamingForPartialAggregationEnabled(), + false), new PropertyMetadata<>( AGGREGATION_IF_TO_FILTER_REWRITE_STRATEGY, format("Set the strategy used to rewrite AGG IF to AGG FILTER. Options are %s", @@ -2034,6 +2040,11 @@ public static boolean isVerboseRuntimeStatsEnabled(Session session) return session.getSystemProperty(VERBOSE_RUNTIME_STATS_ENABLED, Boolean.class); } + public static boolean isStreamingForPartialAggregationEnabled(Session session) + { + return session.getSystemProperty(STREAMING_FOR_PARTIAL_AGGREGATION_ENABLED, Boolean.class); + } + public static AggregationIfToFilterRewriteStrategy getAggregationIfToFilterRewriteStrategy(Session session) { return session.getSystemProperty(AGGREGATION_IF_TO_FILTER_REWRITE_STRATEGY, AggregationIfToFilterRewriteStrategy.class); diff --git a/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java b/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java index 92449aaf93a13..cd1e78e318bc8 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java @@ -216,6 +216,8 @@ public class FeaturesConfig private boolean hashBasedDistinctLimitEnabled; private int hashBasedDistinctLimitThreshold = 10000; + private boolean streamingForPartialAggregationEnabled; + public enum PartitioningPrecisionStrategy { // Let Presto decide when to repartition @@ -1957,4 +1959,16 @@ public int getHashBasedDistinctLimitThreshold() { return hashBasedDistinctLimitThreshold; } + + public boolean isStreamingForPartialAggregationEnabled() + { + return streamingForPartialAggregationEnabled; + } + + @Config("streaming-for-partial-aggregation-enabled") + public FeaturesConfig setStreamingForPartialAggregationEnabled(boolean streamingForPartialAggregationEnabled) + { + this.streamingForPartialAggregationEnabled = streamingForPartialAggregationEnabled; + return this; + } } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PushPartialAggregationThroughExchange.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PushPartialAggregationThroughExchange.java index 6bc04b1cf6892..d37b3a2e0109c 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PushPartialAggregationThroughExchange.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PushPartialAggregationThroughExchange.java @@ -46,6 +46,7 @@ import static com.facebook.presto.SystemSessionProperties.getPartialAggregationByteReductionThreshold; import static com.facebook.presto.SystemSessionProperties.getPartialAggregationStrategy; +import static com.facebook.presto.SystemSessionProperties.isStreamingForPartialAggregationEnabled; import static com.facebook.presto.operator.aggregation.AggregationUtils.isDecomposable; import static com.facebook.presto.spi.plan.AggregationNode.Step.FINAL; import static com.facebook.presto.spi.plan.AggregationNode.Step.PARTIAL; @@ -254,6 +255,14 @@ private PlanNode split(AggregationNode node, Context context) Optional.empty())); } + // We can always enable streaming aggregation for partial aggregations. But if the table is not pre-group by the groupby columns, it may have regressions. + // This session property is just a solution to force enabling when we know the execution would benefit from partial streaming aggregation. + // We can work on determining it based on the input table properties later. + List preGroupedSymbols = ImmutableList.of(); + if (isStreamingForPartialAggregationEnabled(context.getSession())) { + preGroupedSymbols = ImmutableList.copyOf(node.getGroupingSets().getGroupingKeys()); + } + PlanNode partial = new AggregationNode( node.getSourceLocation(), context.getIdAllocator().getNextId(), @@ -262,7 +271,7 @@ private PlanNode split(AggregationNode node, Context context) node.getGroupingSets(), // preGroupedSymbols reflect properties of the input. Splitting the aggregation and pushing partial aggregation // through the exchange may or may not preserve these properties. Hence, it is safest to drop preGroupedSymbols here. - ImmutableList.of(), + preGroupedSymbols, PARTIAL, node.getHashVariable(), node.getGroupIdVariable()); diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/SymbolMapper.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/SymbolMapper.java index 58118ac45c4d0..9cb8152b16e24 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/SymbolMapper.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/SymbolMapper.java @@ -187,7 +187,7 @@ private AggregationNode map(AggregationNode node, PlanNode source, PlanNodeId ne mapAndDistinctVariable(node.getGroupingKeys()), node.getGroupingSetCount(), node.getGlobalGroupingSets()), - ImmutableList.of(), + mapAndDistinctVariable(node.getPreGroupedVariables()), node.getStep(), node.getHashVariable().map(this::map), node.getGroupIdVariable().map(this::map)); diff --git a/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java b/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java index b413f884b2ae2..8b136be7e0e4b 100644 --- a/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java +++ b/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java @@ -187,7 +187,8 @@ public void testDefaults() .setVerboseRuntimeStatsEnabled(false) .setAggregationIfToFilterRewriteStrategy(AggregationIfToFilterRewriteStrategy.DISABLED) .setHashBasedDistinctLimitEnabled(false) - .setHashBasedDistinctLimitThreshold(10000)); + .setHashBasedDistinctLimitThreshold(10000) + .setStreamingForPartialAggregationEnabled(false)); } @Test @@ -326,6 +327,7 @@ public void testExplicitPropertyMappings() .put("optimizer.aggregation-if-to-filter-rewrite-strategy", "filter_with_if") .put("hash-based-distinct-limit-enabled", "true") .put("hash-based-distinct-limit-threshold", "500") + .put("streaming-for-partial-aggregation-enabled", "true") .build(); FeaturesConfig expected = new FeaturesConfig() @@ -461,7 +463,8 @@ public void testExplicitPropertyMappings() .setVerboseRuntimeStatsEnabled(true) .setAggregationIfToFilterRewriteStrategy(AggregationIfToFilterRewriteStrategy.FILTER_WITH_IF) .setHashBasedDistinctLimitEnabled(true) - .setHashBasedDistinctLimitThreshold(500); + .setHashBasedDistinctLimitThreshold(500) + .setStreamingForPartialAggregationEnabled(true); assertFullMapping(properties, expected); } diff --git a/presto-main/src/test/java/com/facebook/presto/sql/planner/TestStreamingForPartialAggregation.java b/presto-main/src/test/java/com/facebook/presto/sql/planner/TestStreamingForPartialAggregation.java new file mode 100644 index 0000000000000..35d8969a32230 --- /dev/null +++ b/presto-main/src/test/java/com/facebook/presto/sql/planner/TestStreamingForPartialAggregation.java @@ -0,0 +1,52 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.sql.planner; + +import com.facebook.presto.sql.planner.assertions.BasePlanTest; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.testng.annotations.Test; + +import java.util.Optional; + +import static com.facebook.presto.SystemSessionProperties.STREAMING_FOR_PARTIAL_AGGREGATION_ENABLED; +import static com.facebook.presto.spi.plan.AggregationNode.Step.PARTIAL; +import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.aggregation; +import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.anyTree; +import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.functionCall; +import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.singleGroupingSet; +import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.tableScan; + +public class TestStreamingForPartialAggregation + extends BasePlanTest +{ + TestStreamingForPartialAggregation() + { + super(ImmutableMap.of(STREAMING_FOR_PARTIAL_AGGREGATION_ENABLED, "true")); + } + + @Test + public void testMultidates() + { + assertPlan("SELECT clerk, count(*) FROM orders GROUP BY 1", + anyTree(aggregation( + singleGroupingSet("clerk"), + ImmutableMap.of(Optional.empty(), functionCall("count", ImmutableList.of())), + ImmutableList.of("clerk"), // streaming + ImmutableMap.of(), + Optional.empty(), + PARTIAL, + tableScan("orders", ImmutableMap.of("clerk", "clerk"))))); + } +}