Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ public class HiveClientConfig

private boolean s3SelectPushdownEnabled;
private int s3SelectPushdownMaxConnections = 500;
private boolean streamingAggregationEnabled;

private boolean isTemporaryStagingDirectoryEnabled = true;
private String temporaryStagingDirectoryPath = "/tmp/presto-${USER}";
Expand Down Expand Up @@ -206,6 +207,7 @@ public class HiveClientConfig
private boolean userDefinedTypeEncodingEnabled;

private boolean columnIndexFilterEnabled;
private boolean fileSplittable = true;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @arunthirupathi in case this could be useful for delta integration.


@Min(0)
public int getMaxInitialSplits()
Expand Down Expand Up @@ -1355,6 +1357,19 @@ public HiveClientConfig setS3SelectPushdownMaxConnections(int s3SelectPushdownMa
return this;
}

public boolean isStreamingAggregationEnabled()
{
return streamingAggregationEnabled;
}

@Config("hive.streaming-aggregation-enabled")
@ConfigDescription("Enable streaming aggregation execution")
public HiveClientConfig setStreamingAggregationEnabled(boolean streamingAggregationEnabled)
{
this.streamingAggregationEnabled = streamingAggregationEnabled;
return this;
}

public boolean isTemporaryStagingDirectoryEnabled()
{
return isTemporaryStagingDirectoryEnabled;
Expand Down Expand Up @@ -1758,4 +1773,17 @@ public HiveClientConfig setUseRecordPageSourceForCustomSplit(boolean useRecordPa
this.useRecordPageSourceForCustomSplit = useRecordPageSourceForCustomSplit;
return this;
}

public boolean isFileSplittable()
{
return fileSplittable;
}

@Config("hive.file-splittable")
@ConfigDescription("By default, this value is true. Set to false to make a hive file un-splittable when coordinator schedules splits.")
public HiveClientConfig setFileSplittable(boolean fileSplittable)
{
this.fileSplittable = fileSplittable;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,15 @@
import com.facebook.presto.spi.Constraint;
import com.facebook.presto.spi.DiscretePredicates;
import com.facebook.presto.spi.InMemoryRecordSet;
import com.facebook.presto.spi.LocalProperty;
import com.facebook.presto.spi.MaterializedViewNotFoundException;
import com.facebook.presto.spi.MaterializedViewStatus;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.spi.RecordCursor;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.SchemaTablePrefix;
import com.facebook.presto.spi.SortingProperty;
import com.facebook.presto.spi.SystemTable;
import com.facebook.presto.spi.TableLayoutFilterCoverage;
import com.facebook.presto.spi.TableNotFoundException;
Expand Down Expand Up @@ -222,6 +224,7 @@
import static com.facebook.presto.hive.HiveSessionProperties.isSortedWriteToTempPathEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isSortedWritingEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isStatisticsEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isStreamingAggregationEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isUsePageFileForHiveUnsupportedType;
import static com.facebook.presto.hive.HiveSessionProperties.shouldCreateEmptyBucketFilesForTemporaryTable;
import static com.facebook.presto.hive.HiveStorageFormat.AVRO;
Expand Down Expand Up @@ -2758,14 +2761,46 @@ public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTa
predicate = createPredicate(partitionColumns, partitions);
}

// Expose ordering property of the table.
ImmutableList.Builder<LocalProperty<ColumnHandle>> localProperties = ImmutableList.builder();
Optional<Set<ColumnHandle>> streamPartitionColumns = Optional.empty();
if (table.getStorage().getBucketProperty().isPresent() && !table.getStorage().getBucketProperty().get().getSortedBy().isEmpty()) {
ImmutableSet.Builder<ColumnHandle> streamPartitionColumnsBuilder = ImmutableSet.builder();

// streamPartitioningColumns is how we partition the data across splits.
// localProperty is how we partition the data within a split.
// 1. add partition columns to streamPartitionColumns
partitionColumns.forEach(streamPartitionColumnsBuilder::add);

// 2. add sorted columns to streamPartitionColumns and localProperties
HiveBucketProperty bucketProperty = table.getStorage().getBucketProperty().get();
Map<String, ColumnHandle> columnHandles = hiveColumnHandles(table).stream()
.collect(toImmutableMap(HiveColumnHandle::getName, identity()));
bucketProperty.getSortedBy().forEach(sortingColumn -> {
ColumnHandle columnHandle = columnHandles.get(sortingColumn.getColumnName());
localProperties.add(new SortingProperty<>(columnHandle, sortingColumn.getOrder().getSortOrder()));
streamPartitionColumnsBuilder.add(columnHandle);
});

// We currently only set streamPartitionColumns when it enables streaming aggregation and also it's eligible to enable streaming aggregation
// 1. When the bucket columns are the same as the prefix of the sort columns
// 2. When all rows of the same value group are guaranteed to be in the same split. We disable splitting a file when isStreamingAggregationEnabled is true to make sure the property is guaranteed.
List<String> sortColumns = bucketProperty.getSortedBy().stream().map(SortingColumn::getColumnName).collect(toImmutableList());
if (bucketProperty.getBucketedBy().size() <= sortColumns.size()
&& bucketProperty.getBucketedBy().containsAll(sortColumns.subList(0, bucketProperty.getBucketedBy().size()))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the prefix match needs to be ordered ? I assume equality condition and hence order does not matter.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, the order doesn't not matter here. StreamPartitionColumns is similar to the bucket column concept by it is for splits instead of files.

&& isStreamingAggregationEnabled(session)) {
streamPartitionColumns = Optional.of(streamPartitionColumnsBuilder.build());
}
}

return new ConnectorTableLayout(
hiveLayoutHandle,
Optional.empty(),
predicate,
tablePartitioning,
Optional.empty(),
streamPartitionColumns,
discretePredicates,
ImmutableList.of(),
localProperties.build(),
Optional.of(hiveLayoutHandle.getRemainingPredicate()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public final class HiveSessionProperties
public static final String PARTITION_STATISTICS_BASED_OPTIMIZATION_ENABLED = "partition_stats_based_optimization_enabled";
private static final String OPTIMIZE_MISMATCHED_BUCKET_COUNT = "optimize_mismatched_bucket_count";
private static final String S3_SELECT_PUSHDOWN_ENABLED = "s3_select_pushdown_enabled";
public static final String STREAMING_AGGREGATION_ENABLED = "streaming_aggregation_enabled";
public static final String SHUFFLE_PARTITIONED_COLUMNS_FOR_TABLE_WRITE = "shuffle_partitioned_columns_for_table_write";
public static final String TEMPORARY_STAGING_DIRECTORY_ENABLED = "temporary_staging_directory_enabled";
private static final String TEMPORARY_STAGING_DIRECTORY_PATH = "temporary_staging_directory_path";
Expand Down Expand Up @@ -135,6 +136,7 @@ public final class HiveSessionProperties
public static final String MINIMUM_ASSIGNED_SPLIT_WEIGHT = "minimum_assigned_split_weight";
private static final String USE_RECORD_PAGE_SOURCE_FOR_CUSTOM_SPLIT = "use_record_page_source_for_custom_split";
public static final String MAX_INITIAL_SPLITS = "max_initial_splits";
public static final String FILE_SPLITTABLE = "file_splittable";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -413,6 +415,11 @@ public HiveSessionProperties(HiveClientConfig hiveClientConfig, OrcFileWriterCon
"S3 Select pushdown enabled",
hiveClientConfig.isS3SelectPushdownEnabled(),
false),
booleanProperty(
STREAMING_AGGREGATION_ENABLED,
"Enable streaming aggregation execution",
hiveClientConfig.isStreamingAggregationEnabled(),
false),
booleanProperty(
TEMPORARY_STAGING_DIRECTORY_ENABLED,
"Should use temporary staging directory for write operations",
Expand Down Expand Up @@ -663,6 +670,11 @@ public HiveSessionProperties(HiveClientConfig hiveClientConfig, OrcFileWriterCon
MAX_INITIAL_SPLITS,
"Hive max initial split count",
hiveClientConfig.getMaxInitialSplits(),
true),
booleanProperty(
FILE_SPLITTABLE,
"If a hive file is splittable when coordinator schedules splits",
hiveClientConfig.isFileSplittable(),
true));
}

Expand Down Expand Up @@ -883,6 +895,11 @@ public static boolean isS3SelectPushdownEnabled(ConnectorSession session)
return session.getProperty(S3_SELECT_PUSHDOWN_ENABLED, Boolean.class);
}

public static boolean isStreamingAggregationEnabled(ConnectorSession session)
{
return session.getProperty(STREAMING_AGGREGATION_ENABLED, Boolean.class);
}

public static boolean isStatisticsEnabled(ConnectorSession session)
{
return session.getProperty(STATISTICS_ENABLED, Boolean.class);
Expand Down Expand Up @@ -1148,4 +1165,9 @@ public static int getHiveMaxInitialSplitSize(ConnectorSession session)
{
return session.getProperty(MAX_INITIAL_SPLITS, Integer.class);
}

public static boolean isFileSplittable(ConnectorSession session)
{
return session.getProperty(FILE_SPLITTABLE, Boolean.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@
import static com.facebook.presto.hive.HiveMetadata.shouldCreateFilesForMissingBuckets;
import static com.facebook.presto.hive.HiveSessionProperties.getMaxInitialSplitSize;
import static com.facebook.presto.hive.HiveSessionProperties.getNodeSelectionStrategy;
import static com.facebook.presto.hive.HiveSessionProperties.isFileSplittable;
import static com.facebook.presto.hive.HiveSessionProperties.isStreamingAggregationEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isUseListDirectoryCache;
import static com.facebook.presto.hive.HiveUtil.getFooterCount;
import static com.facebook.presto.hive.HiveUtil.getHeaderCount;
Expand Down Expand Up @@ -260,11 +262,16 @@ public ListenableFuture<?> loadPartition(HivePartitionMetadata partition, HiveSp
return addSplitsToSource(splits, splitFactory, hiveSplitSource, stopped);
}
PathFilter pathFilter = isHudiParquetInputFormat(inputFormat) ? hoodiePathFilterLoadingCache.getUnchecked(configuration) : path1 -> true;
// Streaming aggregation works at the granularity of individual files
// S3 Select pushdown works at the granularity of individual S3 objects,
// Partial aggregation pushdown works at the granularity of individual files
// therefore we must not split files when either is enabled.
// Skip header / footer lines are not splittable except for a special case when skip.header.line.count=1
boolean splittable = !s3SelectPushdownEnabled && !partialAggregationsPushedDown && getFooterCount(schema) == 0 && getHeaderCount(schema) <= 1;
boolean splittable = isFileSplittable(session) &&
!isStreamingAggregationEnabled(session) &&
!s3SelectPushdownEnabled &&
!partialAggregationsPushedDown &&
getFooterCount(schema) == 0 && getHeaderCount(schema) <= 1;

// Bucketed partitions are fully loaded immediately since all files must be loaded to determine the file to bucket mapping
if (tableBucketInfo.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ public void testDefaults()
.setPartitionStatisticsBasedOptimizationEnabled(false)
.setS3SelectPushdownEnabled(false)
.setS3SelectPushdownMaxConnections(500)
.setStreamingAggregationEnabled(false)
.setTemporaryStagingDirectoryEnabled(true)
.setTemporaryStagingDirectoryPath("/tmp/presto-${USER}")
.setTemporaryTableSchema("default")
Expand Down Expand Up @@ -161,7 +162,8 @@ public void testDefaults()
.setSizeBasedSplitWeightsEnabled(true)
.setMinimumAssignedSplitWeight(0.05)
.setUserDefinedTypeEncodingEnabled(false)
.setUseRecordPageSourceForCustomSplit(true));
.setUseRecordPageSourceForCustomSplit(true)
.setFileSplittable(true));
}

@Test
Expand Down Expand Up @@ -246,6 +248,7 @@ public void testExplicitPropertyMappings()
.put("hive.partition-statistics-based-optimization-enabled", "true")
.put("hive.s3select-pushdown.enabled", "true")
.put("hive.s3select-pushdown.max-connections", "1234")
.put("hive.streaming-aggregation-enabled", "true")
.put("hive.temporary-staging-directory-enabled", "false")
.put("hive.temporary-staging-directory-path", "updated")
.put("hive.temporary-table-schema", "other")
Expand Down Expand Up @@ -284,6 +287,7 @@ public void testExplicitPropertyMappings()
.put("hive.user-defined-type-encoding-enabled", "true")
.put("hive.minimum-assigned-split-weight", "1.0")
.put("hive.use-record-page-source-for-custom-split", "false")
.put("hive.file-splittable", "false")
.build();

HiveClientConfig expected = new HiveClientConfig()
Expand Down Expand Up @@ -365,6 +369,7 @@ public void testExplicitPropertyMappings()
.setPartitionStatisticsBasedOptimizationEnabled(true)
.setS3SelectPushdownEnabled(true)
.setS3SelectPushdownMaxConnections(1234)
.setStreamingAggregationEnabled(true)
.setTemporaryStagingDirectoryEnabled(false)
.setTemporaryStagingDirectoryPath("updated")
.setTemporaryTableSchema("other")
Expand Down Expand Up @@ -402,7 +407,8 @@ public void testExplicitPropertyMappings()
.setSizeBasedSplitWeightsEnabled(false)
.setMinimumAssignedSplitWeight(1.0)
.setUserDefinedTypeEncodingEnabled(true)
.setUseRecordPageSourceForCustomSplit(false);
.setUseRecordPageSourceForCustomSplit(false)
.setFileSplittable(false);

ConfigAssertions.assertFullMapping(properties, expected);
}
Expand Down
Loading