Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 18 additions & 8 deletions presto-docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,13 @@ Property Name Description

``iceberg.pushdown-filter-enabled`` Experimental: Enable filter pushdown for Iceberg. This is ``false``
only supported with Native Worker.

``iceberg.rows-for-metadata-optimization-threshold`` The maximum number of partitions in an Iceberg table to ``1000``
allow optimizing queries of that table using metadata. If
an Iceberg table has more partitions than this threshold,
metadata optimization is skipped.

Set to ``0`` to disable metadata optimization.
======================================================= ============================================================= ============

Table Properties
Expand Down Expand Up @@ -304,14 +311,17 @@ Session Properties

Session properties set behavior changes for queries executed within the given session.

============================================= ======================================================================
Property Name Description
============================================= ======================================================================
``iceberg.delete_as_join_rewrite_enabled`` Overrides the behavior of the connector property
``iceberg.delete-as-join-rewrite-enabled`` in the current session.
``iceberg.hive_statistics_merge_strategy`` Overrides the behavior of the connector property
``iceberg.hive-statistics-merge-strategy`` in the current session.
============================================= ======================================================================
===================================================== ======================================================================
Property Name Description
===================================================== ======================================================================
``iceberg.delete_as_join_rewrite_enabled`` Overrides the behavior of the connector property
``iceberg.delete-as-join-rewrite-enabled`` in the current session.
``iceberg.hive_statistics_merge_strategy`` Overrides the behavior of the connector property
``iceberg.hive-statistics-merge-strategy`` in the current session.
``iceberg.rows_for_metadata_optimization_threshold`` Overrides the behavior of the connector property
``iceberg.rows-for-metadata-optimization-threshold`` in the current
session.
===================================================== ======================================================================

Caching Support
----------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import static com.facebook.presto.expressions.LogicalRowExpressions.TRUE_CONSTANT;
import static com.facebook.presto.expressions.LogicalRowExpressions.binaryExpression;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_UNKNOWN_ERROR;
import static com.facebook.presto.hive.HivePartition.UNPARTITIONED_ID;
import static com.facebook.presto.hive.rule.FilterPushdownUtils.isEntireColumn;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
Expand All @@ -53,7 +54,7 @@ private MetadataUtils() {}
public static Optional<DiscretePredicates> getDiscretePredicates(List<ColumnHandle> partitionColumns, List<HivePartition> partitions)
{
Optional<DiscretePredicates> discretePredicates = Optional.empty();
if (!partitionColumns.isEmpty()) {
if (!partitionColumns.isEmpty() && !(partitions.size() == 1 && partitions.get(0).getPartitionId().equals(UNPARTITIONED_ID))) {
// Do not create tuple domains for every partition at the same time!
// There can be a huge number of partitions so use an iterable so
// all domains do not need to be in memory at the same time.
Expand Down Expand Up @@ -107,6 +108,9 @@ public static TupleDomain<ColumnHandle> createPredicate(List<ColumnHandle> parti
if (partitions.isEmpty()) {
return TupleDomain.none();
}
if (partitions.size() == 1 && partitions.get(0).getPartitionId().equals(UNPARTITIONED_ID)) {
return TupleDomain.all();
}

return withColumnDomains(
partitionColumns.stream()
Expand Down
6 changes: 6 additions & 0 deletions presto-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-parser</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-analyzer</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@
import static com.facebook.presto.iceberg.IcebergUtil.getDeleteMode;
import static com.facebook.presto.iceberg.IcebergUtil.getFileFormat;
import static com.facebook.presto.iceberg.IcebergUtil.getPartitionKeyColumnHandles;
import static com.facebook.presto.iceberg.IcebergUtil.getPartitions;
import static com.facebook.presto.iceberg.IcebergUtil.getSnapshotIdAsOfTime;
import static com.facebook.presto.iceberg.IcebergUtil.getTableComment;
import static com.facebook.presto.iceberg.IcebergUtil.resolveSnapshotIdByName;
Expand Down Expand Up @@ -215,10 +216,24 @@ public ConnectorTableLayoutResult getTableLayoutForConstraint(
IcebergTableHandle handle = (IcebergTableHandle) table;
Table icebergTable = getIcebergTable(session, handle.getSchemaTableName());

List<IcebergColumnHandle> partitionColumns = getPartitionKeyColumnHandles(icebergTable, typeManager);
List<IcebergColumnHandle> partitionColumns = getPartitionKeyColumnHandles(handle, icebergTable, typeManager);
TupleDomain<ColumnHandle> partitionColumnPredicate = TupleDomain.withColumnDomains(Maps.filterKeys(constraint.getSummary().getDomains().get(), Predicates.in(partitionColumns)));
Optional<Set<IcebergColumnHandle>> requestedColumns = desiredColumns.map(columns -> columns.stream().map(column -> (IcebergColumnHandle) column).collect(toImmutableSet()));

List<HivePartition> partitions;
if (handle.getIcebergTableName().getTableType() == CHANGELOG ||
handle.getIcebergTableName().getTableType() == EQUALITY_DELETES) {
partitions = ImmutableList.of(new HivePartition(handle.getSchemaTableName()));
}
else {
partitions = getPartitions(
typeManager,
handle,
icebergTable,
constraint,
partitionColumns);
}

ConnectorTableLayout layout = getTableLayout(
session,
new IcebergTableLayoutHandle.Builder()
Expand All @@ -230,7 +245,7 @@ public ConnectorTableLayoutResult getTableLayoutForConstraint(
.setRequestedColumns(requestedColumns)
.setPushdownFilterEnabled(isPushdownFilterEnabled(session))
.setPartitionColumnPredicate(partitionColumnPredicate)
.setPartitions(Optional.empty())
.setPartitions(Optional.ofNullable(partitions.size() == 0 ? null : partitions))
.setTable(handle)
.build());
return new ConnectorTableLayoutResult(layout, constraint.getSummary());
Expand Down Expand Up @@ -259,19 +274,19 @@ public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTa
Table icebergTable = getIcebergTable(session, tableHandle.getSchemaTableName());
validateTableMode(session, icebergTable);
List<ColumnHandle> partitionColumns = ImmutableList.copyOf(icebergTableLayoutHandle.getPartitionColumns());
Optional<List<HivePartition>> partitions = icebergTableLayoutHandle.getPartitions();
Optional<DiscretePredicates> discretePredicates = partitions.flatMap(parts -> getDiscretePredicates(partitionColumns, parts));
if (!isPushdownFilterEnabled(session)) {
return new ConnectorTableLayout(
icebergTableLayoutHandle,
Optional.empty(),
icebergTableLayoutHandle.getPartitionColumnPredicate(),
Optional.empty(),
partitions.isPresent() ? icebergTableLayoutHandle.getPartitionColumnPredicate() : TupleDomain.none(),
Optional.empty(),
Optional.empty(),
discretePredicates,
ImmutableList.of(),
Optional.empty());
}
Optional<List<HivePartition>> partitions = icebergTableLayoutHandle.getPartitions();
Optional<DiscretePredicates> discretePredicates = partitions.flatMap(parts -> getDiscretePredicates(partitionColumns, parts));

Map<String, ColumnHandle> predicateColumns = icebergTableLayoutHandle.getPredicateColumns().entrySet()
.stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class IcebergConfig
private double statisticSnapshotRecordDifferenceWeight;
private boolean pushdownFilterEnabled;
private boolean deleteAsJoinRewriteEnabled = true;
private int rowsForMetadataOptimizationThreshold = 1000;

private EnumSet<ColumnStatisticType> hiveStatisticsMergeFlags = EnumSet.noneOf(ColumnStatisticType.class);
private String fileIOImpl = HadoopFileIO.class.getName();
Expand Down Expand Up @@ -251,6 +252,20 @@ public boolean isDeleteAsJoinRewriteEnabled()
return deleteAsJoinRewriteEnabled;
}

@Config("iceberg.rows-for-metadata-optimization-threshold")
@ConfigDescription("The max partitions number to utilize metadata optimization. 0 means skip the metadata optimization directly.")
public IcebergConfig setRowsForMetadataOptimizationThreshold(int rowsForMetadataOptimizationThreshold)
{
this.rowsForMetadataOptimizationThreshold = rowsForMetadataOptimizationThreshold;
return this;
}

@Min(0)
public int getRowsForMetadataOptimizationThreshold()
{
return rowsForMetadataOptimizationThreshold;
}

public boolean getManifestCachingEnabled()
{
return manifestCachingEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public final class IcebergSessionProperties
public static final String DELETE_AS_JOIN_REWRITE_ENABLED = "delete_as_join_rewrite_enabled";
public static final String HIVE_METASTORE_STATISTICS_MERGE_STRATEGY = "hive_statistics_merge_strategy";
public static final String STATISTIC_SNAPSHOT_RECORD_DIFFERENCE_WEIGHT = "statistic_snapshot_record_difference_weight";
public static final String ROWS_FOR_METADATA_OPTIMIZATION_THRESHOLD = "rows_for_metadata_optimization_threshold";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -185,6 +186,13 @@ public IcebergSessionProperties(
DELETE_AS_JOIN_REWRITE_ENABLED,
"When enabled equality delete row filtering will be pushed down into a join.",
icebergConfig.isDeleteAsJoinRewriteEnabled(),
false),
integerProperty(
ROWS_FOR_METADATA_OPTIMIZATION_THRESHOLD,
"The max partitions number to utilize metadata optimization. When partitions number " +
"of an Iceberg table exceeds this threshold, metadata optimization would be skipped for " +
"the table. A value of 0 means skip metadata optimization directly.",
icebergConfig.getRowsForMetadataOptimizationThreshold(),
false));
}

Expand Down Expand Up @@ -295,4 +303,9 @@ public static boolean isDeleteToJoinPushdownEnabled(ConnectorSession session)
{
return session.getProperty(DELETE_AS_JOIN_REWRITE_ENABLED, Boolean.class);
}

public static int getRowsForMetadataOptimizationThreshold(ConnectorSession session)
{
return session.getProperty(ROWS_FOR_METADATA_OPTIMIZATION_THRESHOLD, Integer.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,8 @@
package com.facebook.presto.iceberg;

import com.facebook.presto.common.Page;
import com.facebook.presto.common.Utils;
import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.block.RunLengthEncodedBlock;
import com.facebook.presto.common.type.TimeType;
import com.facebook.presto.common.type.TimestampType;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.hive.HivePartitionKey;
import com.facebook.presto.iceberg.delete.IcebergDeletePageSink;
Expand All @@ -38,13 +35,12 @@
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

import static com.facebook.presto.common.Utils.nativeValueToBlock;
import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.PARTITION_KEY;
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_BAD_DATA;
import static com.facebook.presto.iceberg.IcebergUtil.deserializePartitionValue;
import static com.google.common.base.Throwables.throwIfInstanceOf;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

public class IcebergUpdateablePageSource
implements UpdatablePageSource
Expand Down Expand Up @@ -226,12 +222,4 @@ protected void closeWithSuppression(Throwable throwable)
}
}
}

private Block nativeValueToBlock(Type type, Object prefilledValue)
{
if (prefilledValue != null && (type instanceof TimestampType && ((TimestampType) type).getPrecision() == MILLISECONDS || type instanceof TimeType)) {
return Utils.nativeValueToBlock(type, MICROSECONDS.toMillis((long) prefilledValue));
}
return Utils.nativeValueToBlock(type, prefilledValue);
}
}
Loading