Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -223,14 +223,14 @@
import static com.facebook.presto.hive.HiveSessionProperties.isOfflineDataDebugModeEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isOptimizedMismatchedBucketCount;
import static com.facebook.presto.hive.HiveSessionProperties.isOptimizedPartitionUpdateSerializationEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isOrderBasedExecutionEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isParquetPushdownFilterEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isPreferManifestsToListFiles;
import static com.facebook.presto.hive.HiveSessionProperties.isRespectTableFormat;
import static com.facebook.presto.hive.HiveSessionProperties.isShufflePartitionedColumnsForTableWriteEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isSortedWriteToTempPathEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isSortedWritingEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isStatisticsEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isStreamingAggregationEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isUsePageFileForHiveUnsupportedType;
import static com.facebook.presto.hive.HiveSessionProperties.shouldCreateEmptyBucketFilesForTemporaryTable;
import static com.facebook.presto.hive.HiveStorageFormat.AVRO;
Expand Down Expand Up @@ -2819,36 +2819,33 @@ public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTa
predicate = createPredicate(partitionColumns, partitions);
}

// Expose ordering property of the table.
ImmutableList.Builder<LocalProperty<ColumnHandle>> localProperties = ImmutableList.builder();
// Expose ordering property of the table when order based execution is enabled.
ImmutableList.Builder<LocalProperty<ColumnHandle>> localPropertyBuilder = ImmutableList.builder();
Optional<Set<ColumnHandle>> streamPartitionColumns = Optional.empty();
if (table.getStorage().getBucketProperty().isPresent() && !table.getStorage().getBucketProperty().get().getSortedBy().isEmpty()) {
if (table.getStorage().getBucketProperty().isPresent()
&& !table.getStorage().getBucketProperty().get().getSortedBy().isEmpty()
&& isOrderBasedExecutionEnabled(session)) {
ImmutableSet.Builder<ColumnHandle> streamPartitionColumnsBuilder = ImmutableSet.builder();
Map<String, ColumnHandle> columnHandles = hiveColumnHandles(table).stream()
.collect(toImmutableMap(HiveColumnHandle::getName, identity()));

// streamPartitioningColumns is how we partition the data across splits.
// localProperty is how we partition the data within a split.
// 1. add partition columns to streamPartitionColumns
partitionColumns.forEach(streamPartitionColumnsBuilder::add);

// 2. add sorted columns to streamPartitionColumns and localProperties
HiveBucketProperty bucketProperty = table.getStorage().getBucketProperty().get();
Map<String, ColumnHandle> columnHandles = hiveColumnHandles(table).stream()
.collect(toImmutableMap(HiveColumnHandle::getName, identity()));
bucketProperty.getSortedBy().forEach(sortingColumn -> {
ColumnHandle columnHandle = columnHandles.get(sortingColumn.getColumnName());
localProperties.add(new SortingProperty<>(columnHandle, sortingColumn.getOrder().getSortOrder()));
// 1. add partition columns and bucketed-by columns to streamPartitionColumns
// when order based execution is enabled, splitting is disabled and data is sharded across splits when table is bucketed.
partitionColumns.forEach(streamPartitionColumnsBuilder::add);
table.getStorage().getBucketProperty().get().getBucketedBy().forEach(bucketedByColumn -> {
ColumnHandle columnHandle = columnHandles.get(bucketedByColumn);
streamPartitionColumnsBuilder.add(columnHandle);
});

// We currently only set streamPartitionColumns when it enables streaming aggregation and also it's eligible to enable streaming aggregation
// 1. When the bucket columns are the same as the prefix of the sort columns
// 2. When all rows of the same value group are guaranteed to be in the same split. We disable splitting a file when isStreamingAggregationEnabled is true to make sure the property is guaranteed.
List<String> sortColumns = bucketProperty.getSortedBy().stream().map(SortingColumn::getColumnName).collect(toImmutableList());
if (bucketProperty.getBucketedBy().size() <= sortColumns.size()
&& bucketProperty.getBucketedBy().containsAll(sortColumns.subList(0, bucketProperty.getBucketedBy().size()))
&& isStreamingAggregationEnabled(session)) {
streamPartitionColumns = Optional.of(streamPartitionColumnsBuilder.build());
}
// 2. add sorted-by columns to localPropertyBuilder
table.getStorage().getBucketProperty().get().getSortedBy().forEach(sortingColumn -> {
ColumnHandle columnHandle = columnHandles.get(sortingColumn.getColumnName());
localPropertyBuilder.add(new SortingProperty<>(columnHandle, sortingColumn.getOrder().getSortOrder()));
});
streamPartitionColumns = Optional.of(streamPartitionColumnsBuilder.build());
}

return new ConnectorTableLayout(
Expand All @@ -2858,7 +2855,7 @@ && isStreamingAggregationEnabled(session)) {
tablePartitioning,
streamPartitionColumns,
discretePredicates,
localProperties.build(),
localPropertyBuilder.build(),
Optional.of(hiveLayoutHandle.getRemainingPredicate()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -899,7 +899,7 @@ public static boolean isS3SelectPushdownEnabled(ConnectorSession session)
return session.getProperty(S3_SELECT_PUSHDOWN_ENABLED, Boolean.class);
}

public static boolean isStreamingAggregationEnabled(ConnectorSession session)
public static boolean isOrderBasedExecutionEnabled(ConnectorSession session)
{
return session.getProperty(ORDER_BASED_EXECUTION_ENABLED, Boolean.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
import static com.facebook.presto.hive.HiveSessionProperties.getMaxInitialSplitSize;
import static com.facebook.presto.hive.HiveSessionProperties.getNodeSelectionStrategy;
import static com.facebook.presto.hive.HiveSessionProperties.isFileSplittable;
import static com.facebook.presto.hive.HiveSessionProperties.isStreamingAggregationEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isOrderBasedExecutionEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isUseListDirectoryCache;
import static com.facebook.presto.hive.HiveUtil.getFooterCount;
import static com.facebook.presto.hive.HiveUtil.getHeaderCount;
Expand Down Expand Up @@ -267,7 +267,7 @@ public ListenableFuture<?> loadPartition(HivePartitionMetadata partition, HiveSp
// therefore we must not split files when either is enabled.
// Skip header / footer lines are not splittable except for a special case when skip.header.line.count=1
boolean splittable = isFileSplittable(session) &&
!isStreamingAggregationEnabled(session) &&
!isOrderBasedExecutionEnabled(session) &&
!s3SelectPushdownEnabled &&
!partialAggregationsPushedDown &&
getFooterCount(schema) == 0 && getHeaderCount(schema) <= 1;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive;

import com.facebook.presto.Session;
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.AbstractTestQueryFramework;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.testng.annotations.Test;

import java.util.Optional;

import static com.facebook.presto.SystemSessionProperties.SEGMENTED_AGGREGATION_ENABLED;
import static com.facebook.presto.hive.HiveQueryRunner.HIVE_CATALOG;
import static com.facebook.presto.hive.HiveSessionProperties.ORDER_BASED_EXECUTION_ENABLED;
import static com.facebook.presto.spi.plan.AggregationNode.Step.SINGLE;
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.aggregation;
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.anyTree;
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.functionCall;
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.singleGroupingSet;
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.tableScan;
import static io.airlift.tpch.TpchTable.CUSTOMER;
import static io.airlift.tpch.TpchTable.LINE_ITEM;
import static io.airlift.tpch.TpchTable.NATION;
import static io.airlift.tpch.TpchTable.ORDERS;

public class TestSegmentedAggregation
extends AbstractTestQueryFramework
{
@Override
protected QueryRunner createQueryRunner()
throws Exception
{
return HiveQueryRunner.createQueryRunner(
ImmutableList.of(ORDERS, LINE_ITEM, CUSTOMER, NATION),
ImmutableMap.of("experimental.pushdown-subfields-enabled", "true"),
Optional.empty());
}

@Test
public void testSortedbyKeysPrefixNotASubsetOfGroupbyKeys()
{
QueryRunner queryRunner = getQueryRunner();

try {
queryRunner.execute("CREATE TABLE test_segmented_aggregation_customer0 WITH ( \n" +
" bucket_count = 4, bucketed_by = ARRAY['custkey'], \n" +
" sorted_by = ARRAY['name', 'custkey'], partitioned_by=array['ds'], \n" +
" format = 'DWRF' ) AS \n" +
"SELECT *, '2021-07-11' as ds FROM customer LIMIT 1000\n");

// can't enable segmented aggregation
assertPlan(orderBasedExecutionEnabled(),
"SELECT custkey, count(name) FROM test_segmented_aggregation_customer0 \n" +
"WHERE ds = '2021-07-11' GROUP BY 1",
anyTree(aggregation(
singleGroupingSet("custkey"),
ImmutableMap.of(Optional.of("count"), functionCall("count", ImmutableList.of("name"))),
ImmutableList.of(), // no segmented streaming
ImmutableMap.of(),
Optional.empty(),
SINGLE,
tableScan("test_segmented_aggregation_customer0", ImmutableMap.of("custkey", "custkey", "name", "name")))));
}
finally {
queryRunner.execute("DROP TABLE IF EXISTS test_segmented_aggregation_customer0");
}
}

@Test
public void testAndSortedByKeysArePrefixOfGroupbyKeys()
{
QueryRunner queryRunner = getQueryRunner();

try {
queryRunner.execute("CREATE TABLE test_segmented_aggregation_customer WITH ( \n" +
" bucket_count = 4, bucketed_by = ARRAY['custkey', 'name'], \n" +
" sorted_by = ARRAY['custkey', 'name'], partitioned_by=array['ds'], \n" +
" format = 'DWRF' ) AS \n" +
"SELECT *, '2021-07-11' as ds FROM customer LIMIT 1000\n");

assertPlan(
orderBasedExecutionEnabled(),
"SELECT custkey, name, nationkey, COUNT(*) FROM test_segmented_aggregation_customer \n" +
"WHERE ds = '2021-07-11' GROUP BY 1, 2, 3",
anyTree(aggregation(
singleGroupingSet("custkey", "name", "nationkey"),
ImmutableMap.of(Optional.empty(), functionCall("count", ImmutableList.of())),
ImmutableList.of("custkey", "name"), // segmented streaming
ImmutableMap.of(),
Optional.empty(),
SINGLE,
tableScan("test_segmented_aggregation_customer", ImmutableMap.of("custkey", "custkey", "name", "name", "nationkey", "nationkey")))));
}
finally {
queryRunner.execute("DROP TABLE IF EXISTS test_segmented_aggregation_customer");
}
}

@Test
public void testSortedByPrefixOfBucketedKeys()
{
QueryRunner queryRunner = getQueryRunner();

try {
queryRunner.execute("CREATE TABLE test_segmented_aggregation_customer2 WITH ( \n" +
" bucket_count = 4, bucketed_by = ARRAY['custkey', 'name'], \n" +
" sorted_by = ARRAY['custkey'], partitioned_by=array['ds'], \n" +
" format = 'DWRF' ) AS \n" +
"SELECT *, '2021-07-11' as ds FROM customer LIMIT 1000\n");

// can enable segmented aggregation
assertPlan(orderBasedExecutionEnabled(),
"SELECT name, custkey, COUNT(*) FROM test_segmented_aggregation_customer2 \n" +
"WHERE ds = '2021-07-11' GROUP BY 1, 2",
anyTree(aggregation(
singleGroupingSet("name", "custkey"),
ImmutableMap.of(Optional.empty(), functionCall("count", ImmutableList.of())),
ImmutableList.of("custkey"), // segmented aggregation
ImmutableMap.of(),
Optional.empty(),
SINGLE,
tableScan("test_segmented_aggregation_customer2", ImmutableMap.of("name", "name", "custkey", "custkey")))));
}
finally {
queryRunner.execute("DROP TABLE IF EXISTS test_segmented_aggregation_customer2");
}
}

@Test
public void testGroupByKeysShareElementsAsSortedByKeysPrefix()
{
QueryRunner queryRunner = getQueryRunner();

try {
queryRunner.execute("CREATE TABLE test_segmented_aggregation_customer_share_elements WITH ( \n" +
" bucket_count = 4, bucketed_by = ARRAY['custkey', 'name', 'nationkey'], \n" +
" sorted_by = ARRAY['custkey', 'phone'], partitioned_by=array['ds'], \n" +
" format = 'DWRF' ) AS \n" +
"SELECT *, '2021-07-11' as ds FROM customer LIMIT 1000\n");

// can enable segmented aggregation
assertPlan(orderBasedExecutionEnabled(),
"SELECT name, custkey, nationkey, COUNT(*) FROM test_segmented_aggregation_customer_share_elements \n" +
"WHERE ds = '2021-07-11' GROUP BY 1, 2, 3",
anyTree(aggregation(
singleGroupingSet("name", "custkey", "nationkey"),
ImmutableMap.of(Optional.empty(), functionCall("count", ImmutableList.of())),
ImmutableList.of("custkey"), // segmented aggregation
ImmutableMap.of(),
Optional.empty(),
SINGLE,
tableScan("test_segmented_aggregation_customer_share_elements", ImmutableMap.of("name", "name", "custkey", "custkey", "nationkey", "nationkey")))));
}
finally {
queryRunner.execute("DROP TABLE IF EXISTS test_segmented_aggregation_customer_share_elements");
}
}

private Session orderBasedExecutionEnabled()
{
return Session.builder(getQueryRunner().getDefaultSession())
.setCatalogSessionProperty(HIVE_CATALOG, ORDER_BASED_EXECUTION_ENABLED, "true")
.setSystemProperty(SEGMENTED_AGGREGATION_ENABLED, "true")
.build();
}
}
Loading