diff --git a/presto-docs/src/main/sphinx/connector/iceberg.rst b/presto-docs/src/main/sphinx/connector/iceberg.rst index 8054439473844..baca992dee1d0 100644 --- a/presto-docs/src/main/sphinx/connector/iceberg.rst +++ b/presto-docs/src/main/sphinx/connector/iceberg.rst @@ -240,6 +240,13 @@ Property Name Description ``iceberg.pushdown-filter-enabled`` Experimental: Enable filter pushdown for Iceberg. This is ``false`` only supported with Native Worker. + +``iceberg.rows-for-metadata-optimization-threshold`` The maximum number of partitions in an Iceberg table to ``1000`` + allow optimizing queries of that table using metadata. If + an Iceberg table has more partitions than this threshold, + metadata optimization is skipped. + + Set to ``0`` to disable metadata optimization. ======================================================= ============================================================= ============ Table Properties @@ -304,14 +311,17 @@ Session Properties Session properties set behavior changes for queries executed within the given session. -============================================= ====================================================================== -Property Name Description -============================================= ====================================================================== -``iceberg.delete_as_join_rewrite_enabled`` Overrides the behavior of the connector property - ``iceberg.delete-as-join-rewrite-enabled`` in the current session. -``iceberg.hive_statistics_merge_strategy`` Overrides the behavior of the connector property - ``iceberg.hive-statistics-merge-strategy`` in the current session. -============================================= ====================================================================== +===================================================== ====================================================================== +Property Name Description +===================================================== ====================================================================== +``iceberg.delete_as_join_rewrite_enabled`` Overrides the behavior of the connector property + ``iceberg.delete-as-join-rewrite-enabled`` in the current session. +``iceberg.hive_statistics_merge_strategy`` Overrides the behavior of the connector property + ``iceberg.hive-statistics-merge-strategy`` in the current session. +``iceberg.rows_for_metadata_optimization_threshold`` Overrides the behavior of the connector property + ``iceberg.rows-for-metadata-optimization-threshold`` in the current + session. +===================================================== ====================================================================== Caching Support ---------------- diff --git a/presto-hive-common/src/main/java/com/facebook/presto/hive/MetadataUtils.java b/presto-hive-common/src/main/java/com/facebook/presto/hive/MetadataUtils.java index ac417b365f974..361709b7e496b 100644 --- a/presto-hive-common/src/main/java/com/facebook/presto/hive/MetadataUtils.java +++ b/presto-hive-common/src/main/java/com/facebook/presto/hive/MetadataUtils.java @@ -39,6 +39,7 @@ import static com.facebook.presto.expressions.LogicalRowExpressions.TRUE_CONSTANT; import static com.facebook.presto.expressions.LogicalRowExpressions.binaryExpression; import static com.facebook.presto.hive.HiveErrorCode.HIVE_UNKNOWN_ERROR; +import static com.facebook.presto.hive.HivePartition.UNPARTITIONED_ID; import static com.facebook.presto.hive.rule.FilterPushdownUtils.isEntireColumn; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.ImmutableList.toImmutableList; @@ -53,7 +54,7 @@ private MetadataUtils() {} public static Optional getDiscretePredicates(List partitionColumns, List partitions) { Optional discretePredicates = Optional.empty(); - if (!partitionColumns.isEmpty()) { + if (!partitionColumns.isEmpty() && !(partitions.size() == 1 && partitions.get(0).getPartitionId().equals(UNPARTITIONED_ID))) { // Do not create tuple domains for every partition at the same time! // There can be a huge number of partitions so use an iterable so // all domains do not need to be in memory at the same time. @@ -107,6 +108,9 @@ public static TupleDomain createPredicate(List parti if (partitions.isEmpty()) { return TupleDomain.none(); } + if (partitions.size() == 1 && partitions.get(0).getPartitionId().equals(UNPARTITIONED_ID)) { + return TupleDomain.all(); + } return withColumnDomains( partitionColumns.stream() diff --git a/presto-iceberg/pom.xml b/presto-iceberg/pom.xml index 087f575cd28e9..24075daaf62c5 100644 --- a/presto-iceberg/pom.xml +++ b/presto-iceberg/pom.xml @@ -500,6 +500,12 @@ test + + com.facebook.presto + presto-parser + test + + com.facebook.presto presto-analyzer diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java index d85ae11cd2ead..b95ce83575abf 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java @@ -129,6 +129,7 @@ import static com.facebook.presto.iceberg.IcebergUtil.getDeleteMode; import static com.facebook.presto.iceberg.IcebergUtil.getFileFormat; import static com.facebook.presto.iceberg.IcebergUtil.getPartitionKeyColumnHandles; +import static com.facebook.presto.iceberg.IcebergUtil.getPartitions; import static com.facebook.presto.iceberg.IcebergUtil.getSnapshotIdAsOfTime; import static com.facebook.presto.iceberg.IcebergUtil.getTableComment; import static com.facebook.presto.iceberg.IcebergUtil.resolveSnapshotIdByName; @@ -215,10 +216,24 @@ public ConnectorTableLayoutResult getTableLayoutForConstraint( IcebergTableHandle handle = (IcebergTableHandle) table; Table icebergTable = getIcebergTable(session, handle.getSchemaTableName()); - List partitionColumns = getPartitionKeyColumnHandles(icebergTable, typeManager); + List partitionColumns = getPartitionKeyColumnHandles(handle, icebergTable, typeManager); TupleDomain partitionColumnPredicate = TupleDomain.withColumnDomains(Maps.filterKeys(constraint.getSummary().getDomains().get(), Predicates.in(partitionColumns))); Optional> requestedColumns = desiredColumns.map(columns -> columns.stream().map(column -> (IcebergColumnHandle) column).collect(toImmutableSet())); + List partitions; + if (handle.getIcebergTableName().getTableType() == CHANGELOG || + handle.getIcebergTableName().getTableType() == EQUALITY_DELETES) { + partitions = ImmutableList.of(new HivePartition(handle.getSchemaTableName())); + } + else { + partitions = getPartitions( + typeManager, + handle, + icebergTable, + constraint, + partitionColumns); + } + ConnectorTableLayout layout = getTableLayout( session, new IcebergTableLayoutHandle.Builder() @@ -230,7 +245,7 @@ public ConnectorTableLayoutResult getTableLayoutForConstraint( .setRequestedColumns(requestedColumns) .setPushdownFilterEnabled(isPushdownFilterEnabled(session)) .setPartitionColumnPredicate(partitionColumnPredicate) - .setPartitions(Optional.empty()) + .setPartitions(Optional.ofNullable(partitions.size() == 0 ? null : partitions)) .setTable(handle) .build()); return new ConnectorTableLayoutResult(layout, constraint.getSummary()); @@ -259,19 +274,19 @@ public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTa Table icebergTable = getIcebergTable(session, tableHandle.getSchemaTableName()); validateTableMode(session, icebergTable); List partitionColumns = ImmutableList.copyOf(icebergTableLayoutHandle.getPartitionColumns()); + Optional> partitions = icebergTableLayoutHandle.getPartitions(); + Optional discretePredicates = partitions.flatMap(parts -> getDiscretePredicates(partitionColumns, parts)); if (!isPushdownFilterEnabled(session)) { return new ConnectorTableLayout( icebergTableLayoutHandle, Optional.empty(), - icebergTableLayoutHandle.getPartitionColumnPredicate(), - Optional.empty(), + partitions.isPresent() ? icebergTableLayoutHandle.getPartitionColumnPredicate() : TupleDomain.none(), Optional.empty(), Optional.empty(), + discretePredicates, ImmutableList.of(), Optional.empty()); } - Optional> partitions = icebergTableLayoutHandle.getPartitions(); - Optional discretePredicates = partitions.flatMap(parts -> getDiscretePredicates(partitionColumns, parts)); Map predicateColumns = icebergTableLayoutHandle.getPredicateColumns().entrySet() .stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConfig.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConfig.java index 9b370b4d3576c..d2efa5a5becd4 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConfig.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConfig.java @@ -52,6 +52,7 @@ public class IcebergConfig private double statisticSnapshotRecordDifferenceWeight; private boolean pushdownFilterEnabled; private boolean deleteAsJoinRewriteEnabled = true; + private int rowsForMetadataOptimizationThreshold = 1000; private EnumSet hiveStatisticsMergeFlags = EnumSet.noneOf(ColumnStatisticType.class); private String fileIOImpl = HadoopFileIO.class.getName(); @@ -251,6 +252,20 @@ public boolean isDeleteAsJoinRewriteEnabled() return deleteAsJoinRewriteEnabled; } + @Config("iceberg.rows-for-metadata-optimization-threshold") + @ConfigDescription("The max partitions number to utilize metadata optimization. 0 means skip the metadata optimization directly.") + public IcebergConfig setRowsForMetadataOptimizationThreshold(int rowsForMetadataOptimizationThreshold) + { + this.rowsForMetadataOptimizationThreshold = rowsForMetadataOptimizationThreshold; + return this; + } + + @Min(0) + public int getRowsForMetadataOptimizationThreshold() + { + return rowsForMetadataOptimizationThreshold; + } + public boolean getManifestCachingEnabled() { return manifestCachingEnabled; diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSessionProperties.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSessionProperties.java index 69061eedf99c8..d28ff1bfd9d40 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSessionProperties.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSessionProperties.java @@ -62,6 +62,7 @@ public final class IcebergSessionProperties public static final String DELETE_AS_JOIN_REWRITE_ENABLED = "delete_as_join_rewrite_enabled"; public static final String HIVE_METASTORE_STATISTICS_MERGE_STRATEGY = "hive_statistics_merge_strategy"; public static final String STATISTIC_SNAPSHOT_RECORD_DIFFERENCE_WEIGHT = "statistic_snapshot_record_difference_weight"; + public static final String ROWS_FOR_METADATA_OPTIMIZATION_THRESHOLD = "rows_for_metadata_optimization_threshold"; private final List> sessionProperties; @@ -185,6 +186,13 @@ public IcebergSessionProperties( DELETE_AS_JOIN_REWRITE_ENABLED, "When enabled equality delete row filtering will be pushed down into a join.", icebergConfig.isDeleteAsJoinRewriteEnabled(), + false), + integerProperty( + ROWS_FOR_METADATA_OPTIMIZATION_THRESHOLD, + "The max partitions number to utilize metadata optimization. When partitions number " + + "of an Iceberg table exceeds this threshold, metadata optimization would be skipped for " + + "the table. A value of 0 means skip metadata optimization directly.", + icebergConfig.getRowsForMetadataOptimizationThreshold(), false)); } @@ -295,4 +303,9 @@ public static boolean isDeleteToJoinPushdownEnabled(ConnectorSession session) { return session.getProperty(DELETE_AS_JOIN_REWRITE_ENABLED, Boolean.class); } + + public static int getRowsForMetadataOptimizationThreshold(ConnectorSession session) + { + return session.getProperty(ROWS_FOR_METADATA_OPTIMIZATION_THRESHOLD, Integer.class); + } } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUpdateablePageSource.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUpdateablePageSource.java index cda88ee8ad026..98b7a4690145d 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUpdateablePageSource.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUpdateablePageSource.java @@ -14,11 +14,8 @@ package com.facebook.presto.iceberg; import com.facebook.presto.common.Page; -import com.facebook.presto.common.Utils; import com.facebook.presto.common.block.Block; import com.facebook.presto.common.block.RunLengthEncodedBlock; -import com.facebook.presto.common.type.TimeType; -import com.facebook.presto.common.type.TimestampType; import com.facebook.presto.common.type.Type; import com.facebook.presto.hive.HivePartitionKey; import com.facebook.presto.iceberg.delete.IcebergDeletePageSink; @@ -38,13 +35,12 @@ import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; +import static com.facebook.presto.common.Utils.nativeValueToBlock; import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.PARTITION_KEY; import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_BAD_DATA; import static com.facebook.presto.iceberg.IcebergUtil.deserializePartitionValue; import static com.google.common.base.Throwables.throwIfInstanceOf; import static java.util.Objects.requireNonNull; -import static java.util.concurrent.TimeUnit.MICROSECONDS; -import static java.util.concurrent.TimeUnit.MILLISECONDS; public class IcebergUpdateablePageSource implements UpdatablePageSource @@ -226,12 +222,4 @@ protected void closeWithSuppression(Throwable throwable) } } } - - private Block nativeValueToBlock(Type type, Object prefilledValue) - { - if (prefilledValue != null && (type instanceof TimestampType && ((TimestampType) type).getPrecision() == MILLISECONDS || type instanceof TimeType)) { - return Utils.nativeValueToBlock(type, MICROSECONDS.toMillis((long) prefilledValue)); - } - return Utils.nativeValueToBlock(type, prefilledValue); - } } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java index cb9d157d6a87e..851ede5294928 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java @@ -44,7 +44,6 @@ import com.facebook.presto.spi.SchemaTableName; import com.facebook.presto.spi.TableNotFoundException; import com.facebook.presto.spi.connector.ConnectorMetadata; -import com.google.common.base.VerifyException; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; @@ -56,6 +55,7 @@ import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.HistoryEntry; +import org.apache.iceberg.ManifestFile; import org.apache.iceberg.MetadataTableType; import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; @@ -74,7 +74,6 @@ import org.apache.iceberg.types.Types; import org.apache.iceberg.types.Types.NestedField; import org.apache.iceberg.util.SnapshotUtil; -import org.joda.time.DateTimeZone; import java.io.IOException; import java.io.UncheckedIOException; @@ -110,24 +109,12 @@ import static com.facebook.presto.common.type.SmallintType.SMALLINT; import static com.facebook.presto.common.type.TimeType.TIME; import static com.facebook.presto.common.type.TimestampType.TIMESTAMP; +import static com.facebook.presto.common.type.TimestampType.TIMESTAMP_MICROSECONDS; import static com.facebook.presto.common.type.TinyintType.TINYINT; import static com.facebook.presto.common.type.VarbinaryType.VARBINARY; import static com.facebook.presto.common.type.Varchars.isVarcharType; import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.PARTITION_KEY; import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.REGULAR; -import static com.facebook.presto.hive.HiveUtil.bigintPartitionKey; -import static com.facebook.presto.hive.HiveUtil.booleanPartitionKey; -import static com.facebook.presto.hive.HiveUtil.charPartitionKey; -import static com.facebook.presto.hive.HiveUtil.datePartitionKey; -import static com.facebook.presto.hive.HiveUtil.doublePartitionKey; -import static com.facebook.presto.hive.HiveUtil.floatPartitionKey; -import static com.facebook.presto.hive.HiveUtil.integerPartitionKey; -import static com.facebook.presto.hive.HiveUtil.longDecimalPartitionKey; -import static com.facebook.presto.hive.HiveUtil.shortDecimalPartitionKey; -import static com.facebook.presto.hive.HiveUtil.smallintPartitionKey; -import static com.facebook.presto.hive.HiveUtil.timestampPartitionKey; -import static com.facebook.presto.hive.HiveUtil.tinyintPartitionKey; -import static com.facebook.presto.hive.HiveUtil.varcharPartitionKey; import static com.facebook.presto.hive.metastore.MetastoreUtil.PRESTO_QUERY_ID_NAME; import static com.facebook.presto.hive.metastore.MetastoreUtil.PRESTO_VERSION_NAME; import static com.facebook.presto.hive.metastore.MetastoreUtil.PRESTO_VIEW_COMMENT; @@ -136,6 +123,7 @@ import static com.facebook.presto.iceberg.ExpressionConverter.toIcebergExpression; import static com.facebook.presto.iceberg.FileContent.POSITION_DELETES; import static com.facebook.presto.iceberg.FileContent.fromIcebergFileContent; +import static com.facebook.presto.iceberg.FileFormat.PARQUET; import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_INVALID_FORMAT_VERSION; import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_INVALID_PARTITION_VALUE; import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_INVALID_SNAPSHOT_ID; @@ -152,6 +140,7 @@ import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableMap.toImmutableMap; +import static com.google.common.collect.ImmutableSet.toImmutableSet; import static com.google.common.collect.Maps.immutableEntry; import static com.google.common.collect.Streams.mapWithIndex; import static com.google.common.collect.Streams.stream; @@ -171,6 +160,7 @@ import static java.util.Collections.emptyIterator; import static java.util.Comparator.comparing; import static java.util.Objects.requireNonNull; +import static java.util.concurrent.TimeUnit.MICROSECONDS; import static org.apache.iceberg.BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE; import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP; import static org.apache.iceberg.CatalogProperties.IO_MANIFEST_CACHE_ENABLED; @@ -239,20 +229,22 @@ public static Table getNativeIcebergTable(IcebergResourceFactory resourceFactory return resourceFactory.getCatalog(session).loadTable(toIcebergTableIdentifier(table)); } - public static List getPartitionKeyColumnHandles(org.apache.iceberg.Table table, TypeManager typeManager) + public static List getPartitionKeyColumnHandles(IcebergTableHandle tableHandle, Table table, TypeManager typeManager) { - ImmutableList.Builder partitionColumns = ImmutableList.builder(); - List allColumns = getColumns(table.schema(), table.spec(), typeManager); - - for (int i = 0; i < table.spec().fields().size(); i++) { - PartitionField field = table.spec().fields().get(i); - if (field.transform().isIdentity()) { - Optional columnHandle = allColumns.stream().filter(icebergColumnHandle -> Objects.equals(icebergColumnHandle.getName(), field.name())).findAny(); - columnHandle.ifPresent(partitionColumns::add); - } - } - - return partitionColumns.build(); + Set partitionSpecs = tableHandle.getIcebergTableName().getSnapshotId() + .map(snapshot -> table.snapshot(snapshot).allManifests(table.io()).stream() + .map(ManifestFile::partitionSpecId) + .map(specId -> table.specs().get(specId)) + .collect(toImmutableSet())) + .orElseGet(() -> ImmutableSet.copyOf(table.specs().values())); // No snapshot, so no data. This case doesn't matter. + + return table.spec().fields().stream() + .filter(field -> field.transform().isIdentity() && + partitionSpecs.stream() + .allMatch(partitionSpec -> partitionSpec.getFieldsBySourceId(field.sourceId()).stream() + .anyMatch(partitionField -> partitionField.transform().isIdentity()))) + .map(field -> IcebergColumnHandle.create(table.schema().findField(field.sourceId()), typeManager, PARTITION_KEY)) + .collect(toImmutableList()); } public static Optional resolveSnapshotIdByName(Table table, IcebergTableName name) @@ -438,7 +430,7 @@ public static Optional tryGetLocation(Table table) } } - private static boolean isValidPartitionType(com.facebook.presto.common.type.Type type) + private static boolean isValidPartitionType(FileFormat fileFormat, Type type) { return type instanceof DecimalType || BOOLEAN.equals(type) || @@ -449,145 +441,30 @@ private static boolean isValidPartitionType(com.facebook.presto.common.type.Type REAL.equals(type) || DOUBLE.equals(type) || DATE.equals(type) || - TIMESTAMP.equals(type) || + type instanceof TimestampType || + (TIME.equals(type) && fileFormat == PARQUET) || VARBINARY.equals(type) || isVarcharType(type) || isCharType(type); } - private static void verifyPartitionTypeSupported(String partitionName, com.facebook.presto.common.type.Type type) + private static void verifyPartitionTypeSupported(FileFormat fileFormat, String partitionName, Type type) { - if (!isValidPartitionType(type)) { + if (!isValidPartitionType(fileFormat, type)) { throw new PrestoException(NOT_SUPPORTED, format("Unsupported type [%s] for partition: %s", type, partitionName)); } } private static NullableValue parsePartitionValue( - String partitionName, - String value, - com.facebook.presto.common.type.Type type, - DateTimeZone timeZone) + FileFormat fileFormat, + String partitionStringValue, + Type prestoType, + String partitionName) { - verifyPartitionTypeSupported(partitionName, type); - - if (type instanceof DecimalType) { - DecimalType decimalType = (DecimalType) type; - if (value == null) { - return NullableValue.asNull(decimalType); - } - if (decimalType.isShort()) { - if (value.isEmpty()) { - return NullableValue.of(decimalType, 0L); - } - return NullableValue.of(decimalType, shortDecimalPartitionKey(value, decimalType, partitionName)); - } - else { - if (value.isEmpty()) { - return NullableValue.of(decimalType, Decimals.encodeUnscaledValue(BigInteger.ZERO)); - } - return NullableValue.of(decimalType, longDecimalPartitionKey(value, decimalType, partitionName)); - } - } - - if (BOOLEAN.equals(type)) { - if (value == null) { - return NullableValue.asNull(BOOLEAN); - } - if (value.isEmpty()) { - return NullableValue.of(BOOLEAN, false); - } - return NullableValue.of(BOOLEAN, booleanPartitionKey(value, partitionName)); - } - - if (TINYINT.equals(type)) { - if (value == null) { - return NullableValue.asNull(TINYINT); - } - if (value.isEmpty()) { - return NullableValue.of(TINYINT, 0L); - } - return NullableValue.of(TINYINT, tinyintPartitionKey(value, partitionName)); - } - - if (SMALLINT.equals(type)) { - if (value == null) { - return NullableValue.asNull(SMALLINT); - } - if (value.isEmpty()) { - return NullableValue.of(SMALLINT, 0L); - } - return NullableValue.of(SMALLINT, smallintPartitionKey(value, partitionName)); - } - - if (INTEGER.equals(type)) { - if (value == null) { - return NullableValue.asNull(INTEGER); - } - if (value.isEmpty()) { - return NullableValue.of(INTEGER, 0L); - } - return NullableValue.of(INTEGER, integerPartitionKey(value, partitionName)); - } + verifyPartitionTypeSupported(fileFormat, partitionName, prestoType); - if (BIGINT.equals(type)) { - if (value == null) { - return NullableValue.asNull(BIGINT); - } - if (value.isEmpty()) { - return NullableValue.of(BIGINT, 0L); - } - return NullableValue.of(BIGINT, bigintPartitionKey(value, partitionName)); - } - - if (DATE.equals(type)) { - if (value == null) { - return NullableValue.asNull(DATE); - } - return NullableValue.of(DATE, datePartitionKey(value, partitionName)); - } - - if (TIMESTAMP.equals(type)) { - if (value == null) { - return NullableValue.asNull(TIMESTAMP); - } - return NullableValue.of(TIMESTAMP, timestampPartitionKey(value, timeZone, partitionName)); - } - - if (REAL.equals(type)) { - if (value == null) { - return NullableValue.asNull(REAL); - } - if (value.isEmpty()) { - return NullableValue.of(REAL, (long) floatToRawIntBits(0.0f)); - } - return NullableValue.of(REAL, floatPartitionKey(value, partitionName)); - } - - if (DOUBLE.equals(type)) { - if (value == null) { - return NullableValue.asNull(DOUBLE); - } - if (value.isEmpty()) { - return NullableValue.of(DOUBLE, 0.0); - } - return NullableValue.of(DOUBLE, doublePartitionKey(value, partitionName)); - } - - if (isVarcharType(type)) { - if (value == null) { - return NullableValue.asNull(type); - } - return NullableValue.of(type, varcharPartitionKey(value, partitionName, type)); - } - - if (isCharType(type)) { - if (value == null) { - return NullableValue.asNull(type); - } - return NullableValue.of(type, charPartitionKey(value, partitionName, type)); - } - - throw new VerifyException(format("Unhandled type [%s] for partition: %s", type, partitionName)); + Object partitionValue = deserializePartitionValue(prestoType, partitionStringValue, partitionName); + return partitionValue == null ? NullableValue.asNull(prestoType) : NullableValue.of(prestoType, partitionValue); } public static List getPartitions( @@ -598,7 +475,7 @@ public static List getPartitions( List partitionColumns) { IcebergTableName name = ((IcebergTableHandle) tableHandle).getIcebergTableName(); - + FileFormat fileFormat = getFileFormat(icebergTable); // Empty iceberg table would cause `snapshotId` not present Optional snapshotId = resolveSnapshotIdByName(icebergTable, name); if (!snapshotId.isPresent()) { @@ -613,6 +490,10 @@ public static List getPartitions( try (CloseableIterable fileScanTasks = tableScan.planFiles()) { for (FileScanTask fileScanTask : fileScanTasks) { + // If exists delete files, skip the metadata optimization based on partition values as they might become incorrect + if (!fileScanTask.deletes().isEmpty()) { + return ImmutableList.of(new HivePartition(((IcebergTableHandle) tableHandle).getSchemaTableName())); + } StructLike partition = fileScanTask.file().partition(); PartitionSpec spec = fileScanTask.spec(); Map fieldToIndex = getIdentityPartitions(spec); @@ -637,12 +518,14 @@ public static List getPartitions( } } - NullableValue partitionValue = parsePartitionValue(partition.toString(), partitionStringValue, toPrestoType(type, typeManager), DateTimeZone.UTC); + NullableValue partitionValue = parsePartitionValue(fileFormat, partitionStringValue, toPrestoType(type, typeManager), partition.toString()); Optional column = partitionColumns.stream() - .filter(icebergColumnHandle -> Objects.equals(icebergColumnHandle.getName(), field.name())) + .filter(icebergColumnHandle -> Objects.equals(icebergColumnHandle.getId(), field.sourceId())) .findAny(); - builder.put(column.get(), partitionValue); + if (column.isPresent()) { + builder.put(column.get(), partitionValue); + } }); Map values = builder.build(); @@ -725,7 +608,10 @@ public static Object deserializePartitionValue(Type type, String valueString, St if (type.equals(DOUBLE)) { return parseDouble(valueString); } - if (type.equals(DATE) || type.equals(TIME) || type.equals(TIMESTAMP)) { + if (type.equals(TIMESTAMP) || type.equals(TIME)) { + return MICROSECONDS.toMillis(parseLong(valueString)); + } + if (type.equals(DATE) || type.equals(TIMESTAMP_MICROSECONDS)) { return parseLong(valueString); } if (type instanceof VarcharType) { diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergFilterPushdown.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergFilterPushdown.java index 22ee0a35ffb0b..38f09363270b1 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergFilterPushdown.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergFilterPushdown.java @@ -138,7 +138,8 @@ protected ConnectorPushdownFilterResult getConnectorPushdownFilterResult( checkArgument(tableHandle instanceof IcebergTableHandle, "tableHandle must be IcebergTableHandle"); Table icebergTable = getIcebergTable(metadata, session, ((IcebergTableHandle) tableHandle).getSchemaTableName()); - TupleDomain unenforcedConstraint = TupleDomain.withColumnDomains(Maps.filterKeys(constraint.getSummary().getDomains().get(), not(Predicates.in(getPartitionKeyColumnHandles(icebergTable, typeManager))))); + List partitionColumns = getPartitionKeyColumnHandles((IcebergTableHandle) tableHandle, icebergTable, typeManager); + TupleDomain unenforcedConstraint = TupleDomain.withColumnDomains(Maps.filterKeys(constraint.getSummary().getDomains().get(), not(Predicates.in(partitionColumns)))); TupleDomain domainPredicate = getDomainPredicate(decomposedFilter, unenforcedConstraint); @@ -151,7 +152,6 @@ protected ConnectorPushdownFilterResult getConnectorPushdownFilterResult( Optional> requestedColumns = currentLayoutHandle.map(layout -> ((IcebergTableLayoutHandle) layout).getRequestedColumns()).orElse(Optional.empty()); - List partitionColumns = getPartitionKeyColumnHandles(icebergTable, typeManager); TupleDomain partitionColumnPredicate = TupleDomain.withColumnDomains(Maps.filterKeys( constraint.getSummary().getDomains().get(), Predicates.in(partitionColumns))); diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergMetadataOptimizer.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergMetadataOptimizer.java new file mode 100644 index 0000000000000..e390f45f05fbc --- /dev/null +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergMetadataOptimizer.java @@ -0,0 +1,427 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg.optimizer; + +import com.facebook.presto.common.CatalogSchemaName; +import com.facebook.presto.common.QualifiedObjectName; +import com.facebook.presto.common.predicate.NullableValue; +import com.facebook.presto.common.predicate.TupleDomain; +import com.facebook.presto.common.predicate.TupleDomain.ColumnDomain; +import com.facebook.presto.common.type.Type; +import com.facebook.presto.common.type.TypeManager; +import com.facebook.presto.iceberg.IcebergAbstractMetadata; +import com.facebook.presto.iceberg.IcebergTransactionManager; +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ConnectorPlanOptimizer; +import com.facebook.presto.spi.ConnectorPlanRewriter; +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.ConnectorTableLayout; +import com.facebook.presto.spi.Constraint; +import com.facebook.presto.spi.DiscretePredicates; +import com.facebook.presto.spi.PrestoException; +import com.facebook.presto.spi.StandardErrorCode; +import com.facebook.presto.spi.TableHandle; +import com.facebook.presto.spi.VariableAllocator; +import com.facebook.presto.spi.connector.ConnectorMetadata; +import com.facebook.presto.spi.function.FunctionHandle; +import com.facebook.presto.spi.function.FunctionMetadata; +import com.facebook.presto.spi.function.FunctionMetadataManager; +import com.facebook.presto.spi.function.StandardFunctionResolution; +import com.facebook.presto.spi.plan.AggregationNode; +import com.facebook.presto.spi.plan.AggregationNode.Aggregation; +import com.facebook.presto.spi.plan.Assignments; +import com.facebook.presto.spi.plan.FilterNode; +import com.facebook.presto.spi.plan.MarkDistinctNode; +import com.facebook.presto.spi.plan.PlanNode; +import com.facebook.presto.spi.plan.PlanNodeIdAllocator; +import com.facebook.presto.spi.plan.ProjectNode; +import com.facebook.presto.spi.plan.SortNode; +import com.facebook.presto.spi.plan.TableScanNode; +import com.facebook.presto.spi.plan.ValuesNode; +import com.facebook.presto.spi.relation.CallExpression; +import com.facebook.presto.spi.relation.ConstantExpression; +import com.facebook.presto.spi.relation.DeterminismEvaluator; +import com.facebook.presto.spi.relation.ExpressionOptimizer.Level; +import com.facebook.presto.spi.relation.RowExpression; +import com.facebook.presto.spi.relation.RowExpressionService; +import com.facebook.presto.spi.relation.VariableReferenceExpression; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static com.facebook.presto.expressions.LogicalRowExpressions.TRUE_CONSTANT; +import static com.facebook.presto.iceberg.IcebergSessionProperties.getRowsForMetadataOptimizationThreshold; +import static com.facebook.presto.spi.plan.ProjectNode.Locality.LOCAL; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.Iterables.getOnlyElement; +import static java.util.Objects.requireNonNull; + +public class IcebergMetadataOptimizer + implements ConnectorPlanOptimizer +{ + public static final CatalogSchemaName DEFAULT_NAMESPACE = new CatalogSchemaName("presto", "default"); + private static final Set ALLOWED_FUNCTIONS = ImmutableSet.of( + QualifiedObjectName.valueOf(DEFAULT_NAMESPACE, "max"), + QualifiedObjectName.valueOf(DEFAULT_NAMESPACE, "min"), + QualifiedObjectName.valueOf(DEFAULT_NAMESPACE, "approx_distinct")); + + // Min/Max could be folded into LEAST/GREATEST + private static final Map AGGREGATION_SCALAR_MAPPING = ImmutableMap.of( + QualifiedObjectName.valueOf(DEFAULT_NAMESPACE, "max"), QualifiedObjectName.valueOf(DEFAULT_NAMESPACE, "greatest"), + QualifiedObjectName.valueOf(DEFAULT_NAMESPACE, "min"), QualifiedObjectName.valueOf(DEFAULT_NAMESPACE, "least")); + + private final FunctionMetadataManager functionMetadataManager; + private final TypeManager typeManager; + private final IcebergTransactionManager icebergTransactionManager; + private final RowExpressionService rowExpressionService; + private final StandardFunctionResolution functionResolution; + + public IcebergMetadataOptimizer(FunctionMetadataManager functionMetadataManager, + TypeManager typeManager, + IcebergTransactionManager icebergTransactionManager, + RowExpressionService rowExpressionService, + StandardFunctionResolution functionResolution) + { + this.functionMetadataManager = requireNonNull(functionMetadataManager, "functionMetadataManager is null"); + this.typeManager = requireNonNull(typeManager, "typeManager is null"); + this.icebergTransactionManager = requireNonNull(icebergTransactionManager, "icebergTransactionManager is null"); + this.rowExpressionService = requireNonNull(rowExpressionService, "rowExpressionService is null"); + this.functionResolution = requireNonNull(functionResolution, "functionResolution is null"); + } + + @Override + public PlanNode optimize(PlanNode maxSubplan, ConnectorSession session, VariableAllocator variableAllocator, PlanNodeIdAllocator idAllocator) + { + int rowsForMetadataOptimizationThreshold = getRowsForMetadataOptimizationThreshold(session); + Optimizer optimizer = new Optimizer(session, idAllocator, + functionMetadataManager, + typeManager, + icebergTransactionManager, + rowExpressionService, + functionResolution, + rowsForMetadataOptimizationThreshold); + PlanNode rewrittenPlan = ConnectorPlanRewriter.rewriteWith(optimizer, maxSubplan, null); + return rewrittenPlan; + } + + private static class Optimizer + extends ConnectorPlanRewriter + { + private final ConnectorSession connectorSession; + private final PlanNodeIdAllocator idAllocator; + private final FunctionMetadataManager functionMetadataManager; + private final TypeManager typeManager; + private final IcebergTransactionManager icebergTransactionManager; + private final RowExpressionService rowExpressionService; + private final StandardFunctionResolution functionResolution; + private final int rowsForMetadataOptimizationThreshold; + + private Optimizer(ConnectorSession connectorSession, + PlanNodeIdAllocator idAllocator, + FunctionMetadataManager functionMetadataManager, + TypeManager typeManager, + IcebergTransactionManager icebergTransactionManager, + RowExpressionService rowExpressionService, + StandardFunctionResolution functionResolution, + int rowsForMetadataOptimizationThreshold) + { + checkArgument(rowsForMetadataOptimizationThreshold >= 0, "The value of `rowsForMetadataOptimizationThreshold` should not less than 0"); + this.connectorSession = connectorSession; + this.idAllocator = idAllocator; + this.functionMetadataManager = functionMetadataManager; + this.icebergTransactionManager = icebergTransactionManager; + this.rowExpressionService = rowExpressionService; + this.functionResolution = functionResolution; + this.typeManager = typeManager; + this.rowsForMetadataOptimizationThreshold = rowsForMetadataOptimizationThreshold; + } + + @Override + public PlanNode visitAggregation(AggregationNode node, RewriteContext context) + { + // supported functions are only MIN/MAX/APPROX_DISTINCT or distinct aggregates + for (Aggregation aggregation : node.getAggregations().values()) { + QualifiedObjectName functionName = functionMetadataManager.getFunctionMetadata(aggregation.getFunctionHandle()).getName(); + if (!ALLOWED_FUNCTIONS.contains(functionName) && !aggregation.isDistinct()) { + return context.defaultRewrite(node); + } + } + + Optional result = findTableScan(node.getSource(), rowExpressionService.getDeterminismEvaluator()); + if (!result.isPresent()) { + return context.defaultRewrite(node); + } + + // verify all outputs of table scan are partition keys + TableScanNode tableScan = result.get(); + + ImmutableMap.Builder columnBuilder = ImmutableMap.builder(); + + List inputs = tableScan.getOutputVariables(); + for (VariableReferenceExpression variable : inputs) { + ColumnHandle column = tableScan.getAssignments().get(variable); + columnBuilder.put(variable, column); + } + + Map columns = columnBuilder.build(); + + // Materialize the list of partitions and replace the TableScan node + // with a Values node + ConnectorTableLayout layout; + if (!tableScan.getTable().getLayout().isPresent()) { + layout = getConnectorMetadata(tableScan.getTable()).getTableLayoutForConstraint(connectorSession, tableScan.getTable().getConnectorHandle(), Constraint.alwaysTrue(), Optional.empty()).getTableLayout(); + } + else { + layout = getConnectorMetadata(tableScan.getTable()).getTableLayout(connectorSession, tableScan.getTable().getLayout().get()); + } + + if (!layout.getDiscretePredicates().isPresent()) { + return context.defaultRewrite(node); + } + + DiscretePredicates discretePredicates = layout.getDiscretePredicates().get(); + + // the optimization is only valid if there is no filter on non-partition columns + if (layout.getPredicate().getColumnDomains().isPresent()) { + List predicateColumns = layout.getPredicate().getColumnDomains().get().stream() + .map(ColumnDomain::getColumn) + .collect(toImmutableList()); + if (!discretePredicates.getColumns().containsAll(predicateColumns)) { + return context.defaultRewrite(node); + } + } + + // Remaining predicate after tuple domain pushdown in getTableLayout(). This doesn't have overlap with discretePredicates. + // So it only references non-partition columns. Disable the optimization in this case. + Optional remainingPredicate = layout.getRemainingPredicate(); + if (remainingPredicate.isPresent() && !remainingPredicate.get().equals(TRUE_CONSTANT)) { + return context.defaultRewrite(node); + } + + // the optimization is only valid if the aggregation node only relies on partition keys + if (!discretePredicates.getColumns().containsAll(columns.values())) { + return context.defaultRewrite(node); + } + + if (isReducible(node, inputs)) { + // Fold min/max aggregations to a constant value + return reduce(node, inputs, columns, context, discretePredicates); + } + + // When `rowsForMetadataOptimizationThreshold == 0`, or partitions number exceeds the threshold, skip the optimization + if (rowsForMetadataOptimizationThreshold == 0 || Iterables.size(discretePredicates.getPredicates()) > rowsForMetadataOptimizationThreshold) { + return context.defaultRewrite(node); + } + + ImmutableList.Builder> rowsBuilder = ImmutableList.builder(); + for (TupleDomain domain : discretePredicates.getPredicates()) { + if (domain.isNone()) { + continue; + } + Map entries = TupleDomain.extractFixedValues(domain).get(); + + ImmutableList.Builder rowBuilder = ImmutableList.builder(); + // for each input column, add a literal expression using the entry value + for (VariableReferenceExpression input : inputs) { + ColumnHandle column = columns.get(input); + NullableValue value = entries.get(column); + if (value == null) { + // partition key does not have a single value, so bail out to be safe + return context.defaultRewrite(node); + } + else { + rowBuilder.add(new ConstantExpression(Optional.empty(), value.getValue(), input.getType())); + } + } + rowsBuilder.add(rowBuilder.build()); + } + + // replace the tablescan node with a values node + return ConnectorPlanRewriter.rewriteWith(new Replacer(new ValuesNode(node.getSourceLocation(), idAllocator.getNextId(), inputs, rowsBuilder.build(), Optional.empty())), node); + } + + private boolean isReducible(AggregationNode node, List inputs) + { + // The aggregation is reducible when there is no group by key + if (node.getAggregations().isEmpty() || !node.getGroupingKeys().isEmpty() || !(node.getSource() instanceof TableScanNode)) { + return false; + } + for (Aggregation aggregation : node.getAggregations().values()) { + FunctionMetadata functionMetadata = functionMetadataManager.getFunctionMetadata(aggregation.getFunctionHandle()); + if (!AGGREGATION_SCALAR_MAPPING.containsKey(functionMetadata.getName()) || + functionMetadata.getArgumentTypes().size() > 1 || + !inputs.containsAll(aggregation.getCall().getArguments())) { + return false; + } + } + return true; + } + + private PlanNode reduce( + AggregationNode node, + List inputs, + Map columns, + RewriteContext context, + DiscretePredicates predicates) + { + // Fold min/max aggregations to a constant value + ImmutableMap.Builder> inputColumnValuesBuilder = ImmutableMap.builder(); + // For each input partition column, we keep one tuple domain for each constant value. When we get the resulting value, we will get the corresponding tuple domain and + // check if the partition stats can be trusted. + ImmutableMap.Builder>> inputValueToDomainBuilder = ImmutableMap.builder(); + for (VariableReferenceExpression input : inputs) { + ImmutableList.Builder arguments = ImmutableList.builder(); + Map> valueToDomain = new HashMap<>(); + ColumnHandle column = columns.get(input); + // for each input column, add a literal expression using the entry value + for (TupleDomain domain : predicates.getPredicates()) { + if (domain.isNone()) { + continue; + } + Map entries = TupleDomain.extractFixedValues(domain).get(); + NullableValue value = entries.get(column); + if (value == null) { + // partition key does not have a single value, so bail out to be safe + return context.defaultRewrite(node); + } + // min/max ignores null value + else if (value.getValue() != null) { + Type type = input.getType(); + ConstantExpression constantExpression = new ConstantExpression(Optional.empty(), value.getValue(), type); + arguments.add(constantExpression); + valueToDomain.putIfAbsent(constantExpression, domain); + } + } + inputColumnValuesBuilder.put(input, arguments.build()); + inputValueToDomainBuilder.put(input, valueToDomain); + } + Map> inputColumnValues = inputColumnValuesBuilder.build(); + + Assignments.Builder assignmentsBuilder = Assignments.builder(); + for (VariableReferenceExpression outputVariable : node.getOutputVariables()) { + Aggregation aggregation = node.getAggregations().get(outputVariable); + RowExpression inputVariable = getOnlyElement(aggregation.getArguments()); + RowExpression result = evaluateMinMax( + functionMetadataManager.getFunctionMetadata(node.getAggregations().get(outputVariable).getFunctionHandle()), + inputColumnValues.get(inputVariable)); + assignmentsBuilder.put(outputVariable, result); + } + Assignments assignments = assignmentsBuilder.build(); + ValuesNode valuesNode = new ValuesNode(node.getSourceLocation(), idAllocator.getNextId(), node.getOutputVariables(), ImmutableList.of(new ArrayList<>(assignments.getExpressions())), Optional.empty()); + return new ProjectNode(node.getSourceLocation(), idAllocator.getNextId(), valuesNode, assignments, LOCAL); + } + + private RowExpression evaluateMinMax(FunctionMetadata aggregationFunctionMetadata, List arguments) + { + Type returnType = typeManager.getType(aggregationFunctionMetadata.getReturnType()); + if (arguments.isEmpty()) { + return new ConstantExpression(Optional.empty(), null, returnType); + } + + String scalarFunctionName = AGGREGATION_SCALAR_MAPPING.get(aggregationFunctionMetadata.getName()).getObjectName(); + while (arguments.size() > 1) { + List reducedArguments = new ArrayList<>(); + // We fold for every 100 values because GREATEST/LEAST has argument count limit + for (List partitionedArguments : Lists.partition(arguments, 100)) { + FunctionHandle functionHandle; + if (scalarFunctionName.equals("greatest")) { + functionHandle = functionResolution.greatestFunction(partitionedArguments.stream().map(RowExpression::getType).collect(toImmutableList())); + } + else if (scalarFunctionName.equals("least")) { + functionHandle = functionResolution.leastFunction(partitionedArguments.stream().map(RowExpression::getType).collect(toImmutableList())); + } + else { + throw new PrestoException(StandardErrorCode.NOT_SUPPORTED, "unsupported function: " + scalarFunctionName); + } + + Object reducedValue = rowExpressionService.getExpressionOptimizer().optimize( + new CallExpression( + Optional.empty(), + scalarFunctionName, + functionHandle, + returnType, + partitionedArguments), + Level.EVALUATED, + connectorSession, + variableReferenceExpression -> null); + reducedArguments.add(new ConstantExpression(reducedValue, returnType)); + } + arguments = reducedArguments; + } + return getOnlyElement(arguments); + } + + private static Optional findTableScan(PlanNode source, DeterminismEvaluator determinismEvaluator) + { + while (true) { + // allow any chain of linear transformations + if (source instanceof MarkDistinctNode || + source instanceof FilterNode || + source instanceof SortNode) { + source = source.getSources().get(0); + } + else if (source instanceof ProjectNode) { + // verify projections are deterministic + ProjectNode project = (ProjectNode) source; + if (!Iterables.all(project.getAssignments().getExpressions(), determinismEvaluator::isDeterministic)) { + return Optional.empty(); + } + source = project.getSource(); + } + else if (source instanceof TableScanNode) { + return Optional.of((TableScanNode) source); + } + else { + return Optional.empty(); + } + } + } + + private ConnectorMetadata getConnectorMetadata(TableHandle tableHandle) + { + requireNonNull(icebergTransactionManager, "icebergTransactionManager is null"); + ConnectorMetadata metadata = icebergTransactionManager.get(tableHandle.getTransaction()); + checkState(metadata instanceof IcebergAbstractMetadata, "metadata must be IcebergAbstractMetadata"); + return metadata; + } + } + + private static class Replacer + extends ConnectorPlanRewriter + { + private final ValuesNode replacement; + + private Replacer(ValuesNode replacement) + { + this.replacement = replacement; + } + + @Override + public PlanNode visitTableScan(TableScanNode node, RewriteContext context) + { + return replacement; + } + } +} diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergPlanOptimizerProvider.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergPlanOptimizerProvider.java index e16f8f7ad081d..2150485a5bce5 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergPlanOptimizerProvider.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergPlanOptimizerProvider.java @@ -50,10 +50,12 @@ public IcebergPlanOptimizerProvider( new IcebergPlanOptimizer(functionResolution, rowExpressionService, functionMetadataManager, transactionManager), new IcebergFilterPushdown(rowExpressionService, functionResolution, functionMetadataManager, transactionManager, typeManager), new IcebergParquetDereferencePushDown(transactionManager, rowExpressionService, typeManager)); - this.logicalPlanOptimizers = ImmutableSet.builder() - .addAll(this.planOptimizers) - .add(new IcebergEqualityDeleteAsJoin(functionResolution, transactionManager, typeManager)) - .build(); + this.logicalPlanOptimizers = ImmutableSet.of( + new IcebergPlanOptimizer(functionResolution, rowExpressionService, functionMetadataManager, transactionManager), + new IcebergFilterPushdown(rowExpressionService, functionResolution, functionMetadataManager, transactionManager, typeManager), + new IcebergMetadataOptimizer(functionMetadataManager, typeManager, transactionManager, rowExpressionService, functionResolution), + new IcebergParquetDereferencePushDown(transactionManager, rowExpressionService, typeManager), + new IcebergEqualityDeleteAsJoin(functionResolution, transactionManager, typeManager)); } @Override diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java index 47a9af0243864..824a519ef6866 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java @@ -88,6 +88,7 @@ import java.util.Map; import java.util.Optional; import java.util.UUID; +import java.util.function.BiConsumer; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -492,11 +493,15 @@ public Object[][] timezones() @Test(dataProvider = "timezones") public void testPartitionedByTimestampType(String zoneId, boolean legacyTimestamp) { - Session session = sessionForTimezone(zoneId, legacyTimestamp); + Session sessionForTimeZone = sessionForTimezone(zoneId, legacyTimestamp); + testWithAllFileFormats(sessionForTimeZone, (session, fileFormat) -> testPartitionedByTimestampTypeForFormat(session, fileFormat)); + } + private void testPartitionedByTimestampTypeForFormat(Session session, FileFormat fileFormat) + { try { // create iceberg table partitioned by column of TimestampType, and insert some data - assertQuerySucceeds(session, "create table test_partition_columns(a bigint, b timestamp) with (partitioning = ARRAY['b'])"); + assertQuerySucceeds(session, format("create table test_partition_columns(a bigint, b timestamp) with (partitioning = ARRAY['b'], format = '%s')", fileFormat.name())); assertQuerySucceeds(session, "insert into test_partition_columns values(1, timestamp '1984-12-08 00:10:00'), (2, timestamp '2001-01-08 12:01:01')"); // validate return data of TimestampType @@ -1439,4 +1444,10 @@ private Session sessionForTimezone(String zoneId, boolean legacyTimestamp) } return sessionBuilder.build(); } + + private void testWithAllFileFormats(Session session, BiConsumer test) + { + test.accept(session, FileFormat.PARQUET); + test.accept(session, FileFormat.ORC); + } } diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergConfig.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergConfig.java index 4b0e8979887b6..d1d6db239e358 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergConfig.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergConfig.java @@ -54,6 +54,7 @@ public void testDefaults() .setMergeOnReadModeEnabled(true) .setPushdownFilterEnabled(false) .setDeleteAsJoinRewriteEnabled(true) + .setRowsForMetadataOptimizationThreshold(1000) .setManifestCachingEnabled(false) .setFileIOImpl(HadoopFileIO.class.getName()) .setMaxManifestCacheSize(IO_MANIFEST_CACHE_MAX_TOTAL_BYTES_DEFAULT) @@ -79,6 +80,7 @@ public void testExplicitPropertyMappings() .put("iceberg.hive-statistics-merge-strategy", NUMBER_OF_DISTINCT_VALUES.name() + "," + TOTAL_SIZE_IN_BYTES.name()) .put("iceberg.pushdown-filter-enabled", "true") .put("iceberg.delete-as-join-rewrite-enabled", "false") + .put("iceberg.rows-for-metadata-optimization-threshold", "500") .put("iceberg.io.manifest.cache-enabled", "true") .put("iceberg.io-impl", "com.facebook.presto.iceberg.HdfsFileIO") .put("iceberg.io.manifest.cache.max-total-bytes", "1048576000") @@ -101,6 +103,7 @@ public void testExplicitPropertyMappings() .setHiveStatisticsMergeFlags("NUMBER_OF_DISTINCT_VALUES,TOTAL_SIZE_IN_BYTES") .setPushdownFilterEnabled(true) .setDeleteAsJoinRewriteEnabled(false) + .setRowsForMetadataOptimizationThreshold(500) .setManifestCachingEnabled(true) .setFileIOImpl("com.facebook.presto.iceberg.HdfsFileIO") .setMaxManifestCacheSize(1048576000) diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergLogicalPlanner.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergLogicalPlanner.java index 0b7a8fd4953b1..5ac7542f7942d 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergLogicalPlanner.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergLogicalPlanner.java @@ -24,6 +24,7 @@ import com.facebook.presto.metadata.Metadata; import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.ConnectorTableLayoutHandle; +import com.facebook.presto.spi.plan.AggregationNode; import com.facebook.presto.spi.plan.FilterNode; import com.facebook.presto.spi.plan.PlanNode; import com.facebook.presto.spi.plan.TableScanNode; @@ -39,6 +40,8 @@ import com.facebook.presto.sql.planner.assertions.SymbolAliases; import com.facebook.presto.sql.planner.plan.JoinNode; import com.facebook.presto.sql.relational.FunctionResolution; +import com.facebook.presto.sql.tree.LongLiteral; +import com.facebook.presto.sql.tree.StringLiteral; import com.facebook.presto.testing.QueryRunner; import com.facebook.presto.tests.AbstractTestQueryFramework; import com.google.common.base.Functions; @@ -100,6 +103,7 @@ import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.node; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.output; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.project; +import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.strictProject; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.strictTableScan; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.values; import static com.facebook.presto.sql.planner.optimizations.PlanNodeSearcher.searchFrom; @@ -126,19 +130,397 @@ protected QueryRunner createQueryRunner() return createIcebergQueryRunner(ImmutableMap.of("experimental.pushdown-subfields-enabled", "true"), ImmutableMap.of()); } - @Test - public void testFilterByUnmatchedValueWithFilterPushdown() + @DataProvider(name = "push_down_filter_enabled") + public Object[][] pushDownFilter() { - Session sessionWithFilterPushdown = pushdownFilterEnabled(); + return new Object[][] { + {true}, + {false}}; + } + + @Test(dataProvider = "push_down_filter_enabled") + public void testMetadataQueryOptimizer(boolean enabled) + { + QueryRunner queryRunner = getQueryRunner(); + Session session = getSessionWithOptimizeMetadataQueries(enabled); + try { + queryRunner.execute("create table metadata_optimize(v1 int, v2 varchar, a int, b varchar)" + + " with(partitioning = ARRAY['a', 'b'])"); + queryRunner.execute("insert into metadata_optimize values" + + " (1, '1001', 1, '1001')," + + " (2, '1002', 2, '1001')," + + " (3, '1003', 3, '1002')," + + " (4, '1004', 4, '1002')"); + + assertQuery(session, "select b, max(a), min(a) from metadata_optimize group by b", + "values('1001', 2, 1), ('1002', 4, 3)"); + assertPlan(session, "select b, max(a), min(a) from metadata_optimize group by b", + anyTree(values( + ImmutableList.of("a", "b"), + ImmutableList.of( + ImmutableList.of(new LongLiteral("1"), new StringLiteral("1001")), + ImmutableList.of(new LongLiteral("2"), new StringLiteral("1001")), + ImmutableList.of(new LongLiteral("3"), new StringLiteral("1002")), + ImmutableList.of(new LongLiteral("4"), new StringLiteral("1002")))))); + + assertQuery(session, "select distinct a, b from metadata_optimize", + "values(1, '1001'), (2, '1001'), (3, '1002'), (4, '1002')"); + assertPlan(session, "select distinct a, b from metadata_optimize", + anyTree(values( + ImmutableList.of("a", "b"), + ImmutableList.of( + ImmutableList.of(new LongLiteral("1"), new StringLiteral("1001")), + ImmutableList.of(new LongLiteral("2"), new StringLiteral("1001")), + ImmutableList.of(new LongLiteral("3"), new StringLiteral("1002")), + ImmutableList.of(new LongLiteral("4"), new StringLiteral("1002")))))); + + assertQuery(session, "select min(a), max(b) from metadata_optimize", "values(1, '1002')"); + assertPlan(session, "select min(a), max(b) from metadata_optimize", + anyNot(AggregationNode.class, strictProject( + ImmutableMap.of("a", expression("1"), "b", expression("1002")), + anyTree(values())))); + + // Do metadata optimization on a complex query + assertQuery(session, "with tt as (select a, b, concat(cast(a as varchar), b) as c from metadata_optimize where a > 1 and a < 4 order by b desc)" + + " select min(a), max(b), approx_distinct(b), c from tt group by c", + "values(2, '1001', 1, '21001'), (3, '1002', 1, '31002')"); + assertPlan(session, "with tt as (select a, b, concat(cast(a as varchar), b) as c from metadata_optimize where a > 1 and a < 4 order by b desc)" + + " select min(a), max(b), approx_distinct(b), c from tt group by c", + anyTree(values(ImmutableList.of("a", "b"), + ImmutableList.of( + ImmutableList.of(new LongLiteral("2"), new StringLiteral("1001")), + ImmutableList.of(new LongLiteral("3"), new StringLiteral("1002")))))); + } + finally { + queryRunner.execute("DROP TABLE IF EXISTS metadata_optimize"); + } + } + + @Test(dataProvider = "push_down_filter_enabled") + public void testMetadataQueryOptimizerOnPartitionEvolution(boolean enabled) + { + QueryRunner queryRunner = getQueryRunner(); + Session session = getSessionWithOptimizeMetadataQueries(enabled); + try { + queryRunner.execute("create table metadata_optimize_partition_evolution(v1 int, v2 varchar, a int, b varchar)" + + " with(partitioning = ARRAY['a', 'b'])"); + queryRunner.execute("insert into metadata_optimize_partition_evolution values" + + " (1, '1001', 1, '1001')," + + " (2, '1002', 2, '1001')," + + " (3, '1003', 3, '1002')," + + " (4, '1004', 4, '1002')"); + + queryRunner.execute("alter table metadata_optimize_partition_evolution add column c int with(partitioning = 'identity')"); + queryRunner.execute("insert into metadata_optimize_partition_evolution values" + + " (5, '1005', 5, '1001', 5)," + + " (6, '1006', 6, '1002', 6)"); + + // Do not affect metadata optimization on original partition columns + assertQuery(session, "select b, max(a), min(a) from metadata_optimize_partition_evolution group by b", + "values('1001', 5, 1), ('1002', 6, 3)"); + assertPlan(session, "select b, max(a), min(a) from metadata_optimize_partition_evolution group by b", + anyTree(values( + ImmutableList.of("a", "b"), + ImmutableList.of( + ImmutableList.of(new LongLiteral("1"), new StringLiteral("1001")), + ImmutableList.of(new LongLiteral("2"), new StringLiteral("1001")), + ImmutableList.of(new LongLiteral("3"), new StringLiteral("1002")), + ImmutableList.of(new LongLiteral("4"), new StringLiteral("1002")), + ImmutableList.of(new LongLiteral("5"), new StringLiteral("1001")), + ImmutableList.of(new LongLiteral("6"), new StringLiteral("1002")))))); + + // Only non-filterPushDown could run on Iceberg Java Connector + if (!Boolean.valueOf(enabled)) { + assertQuery(session, "select b, max(c), min(c) from metadata_optimize_partition_evolution group by b", + "values('1001', 5, 5), ('1002', 6, 6)"); + } + // New added partition column is not supported for metadata optimization + assertPlan(session, "select b, max(c), min(c) from metadata_optimize_partition_evolution group by b", + anyTree(strictTableScan("metadata_optimize_partition_evolution", identityMap("b", "c")))); + + // Do not affect metadata optimization on original partition columns + assertQuery(session, "select distinct a, b from metadata_optimize_partition_evolution", + "values(1, '1001'), (2, '1001'), (3, '1002'), (4, '1002'), (5, '1001'), (6, '1002')"); + assertPlan(session, "select distinct a, b from metadata_optimize_partition_evolution", + anyTree(values(ImmutableList.of("a", "b"), + ImmutableList.of( + ImmutableList.of(new LongLiteral("1"), new StringLiteral("1001")), + ImmutableList.of(new LongLiteral("2"), new StringLiteral("1001")), + ImmutableList.of(new LongLiteral("3"), new StringLiteral("1002")), + ImmutableList.of(new LongLiteral("4"), new StringLiteral("1002")), + ImmutableList.of(new LongLiteral("5"), new StringLiteral("1001")), + ImmutableList.of(new LongLiteral("6"), new StringLiteral("1002")))))); + + // Only non-filterPushDown could run on Iceberg Java Connector + if (!Boolean.valueOf(enabled)) { + assertQuery(session, "select distinct a, b, c from metadata_optimize_partition_evolution", + "values(1, '1001', NULL), (2, '1001', NULL), (3, '1002', NULL), (4, '1002', NULL), (5, '1001', 5), (6, '1002', 6)"); + } + + // New added partition column is not supported for metadata optimization + assertPlan(session, "select distinct a, b, c from metadata_optimize_partition_evolution", + anyTree(strictTableScan("metadata_optimize_partition_evolution", identityMap("a", "b", "c")))); + + // Do not affect metadata optimization on original partition columns + assertQuery(session, "select min(a), max(a), min(b), max(b) from metadata_optimize_partition_evolution", "values(1, 6, '1001', '1002')"); + assertPlan(session, "select min(a), max(a), min(b), max(b) from metadata_optimize_partition_evolution", + anyNot(AggregationNode.class, strictProject( + ImmutableMap.of( + "min(a)", expression("1"), + "max(a)", expression("6"), + "min(b)", expression("1001"), + "max(b)", expression("1002")), + anyTree(values())))); + + // Only non-filterPushDown could run on Iceberg Java Connector + if (!Boolean.valueOf(enabled)) { + assertQuery(session, "select min(b), max(b), min(c), max(c) from metadata_optimize_partition_evolution", "values('1001', '1002', 5, 6)"); + } + + // New added partition column is not supported for metadata optimization + assertPlan(session, "select min(b), max(b), min(c), max(c) from metadata_optimize_partition_evolution", + anyTree(strictTableScan("metadata_optimize_partition_evolution", identityMap("b", "c")))); + } + finally { + queryRunner.execute("DROP TABLE IF EXISTS metadata_optimize_partition_evolution"); + } + } + + @Test(dataProvider = "push_down_filter_enabled") + public void testMetadataQueryOptimizationWithLimit(boolean enabled) + { + QueryRunner queryRunner = getQueryRunner(); + Session sessionWithOptimizeMetadataQueries = getSessionWithOptimizeMetadataQueries(enabled); + try { + queryRunner.execute("CREATE TABLE test_metadata_query_optimization_with_limit(a varchar, b int, c int) WITH (partitioning = ARRAY['b', 'c'])"); + queryRunner.execute("INSERT INTO test_metadata_query_optimization_with_limit VALUES" + + " ('1001', 1, 1), ('1002', 1, 1), ('1003', 1, 1)," + + " ('1004', 1, 2), ('1005', 1, 2), ('1006', 1, 2)," + + " ('1007', 2, 1), ('1008', 2, 1), ('1009', 2, 1)"); + + // Could do metadata optimization when `limit` existing above `aggregation` + assertQuery(sessionWithOptimizeMetadataQueries, "select distinct b, c from test_metadata_query_optimization_with_limit order by c desc limit 3", + "values(1, 2), (1, 1), (2, 1)"); + assertPlan(sessionWithOptimizeMetadataQueries, "select distinct b, c from test_metadata_query_optimization_with_limit order by c desc limit 3", + anyTree(values(ImmutableList.of("b", "c"), + ImmutableList.of( + ImmutableList.of(new LongLiteral("1"), new LongLiteral("2")), + ImmutableList.of(new LongLiteral("1"), new LongLiteral("1")), + ImmutableList.of(new LongLiteral("2"), new LongLiteral("1")))))); + + // Should not do metadata optimization when `limit` existing below `aggregation` + // Only non-filterPushDown could run on Iceberg Java Connector + if (!Boolean.valueOf(enabled)) { + assertQuery(sessionWithOptimizeMetadataQueries, "with tt as (select b, c from test_metadata_query_optimization_with_limit order by c desc limit 3) select b, min(c), max(c) from tt group by b", + "values(1, 2, 2)"); + } + assertPlan(sessionWithOptimizeMetadataQueries, "with tt as (select b, c from test_metadata_query_optimization_with_limit order by c desc limit 3) select b, min(c), max(c) from tt group by b", + anyTree(strictTableScan("test_metadata_query_optimization_with_limit", identityMap("b", "c")))); + } + finally { + queryRunner.execute("DROP TABLE if exists test_metadata_query_optimization_with_limit"); + } + } + + @Test(dataProvider = "push_down_filter_enabled") + public void testMetadataQueryOptimizationWithMetadataEnforcedPredicate(boolean enabled) + { + QueryRunner queryRunner = getQueryRunner(); + Session sessionWithOptimizeMetadataQueries = getSessionWithOptimizeMetadataQueries(enabled); + try { + queryRunner.execute("CREATE TABLE test_with_metadata_enforced_filter(a varchar, b int, c int) WITH (partitioning = ARRAY['b', 'c'])"); + queryRunner.execute("INSERT INTO test_with_metadata_enforced_filter VALUES" + + " ('1001', 1, 1), ('1002', 1, 2), ('1003', 1, 3)," + + " ('1007', 2, 1), ('1008', 2, 2), ('1009', 2, 3)"); + + // Could do metadata optimization when filtering by all metadata-enforced predicate + assertQuery(sessionWithOptimizeMetadataQueries, "select b, min(c), max(c) from test_with_metadata_enforced_filter" + + " where c > 1 group by b", + "values(1, 2, 3), (2, 2, 3)"); + assertPlan(sessionWithOptimizeMetadataQueries, "select b, min(c), max(c) from test_with_metadata_enforced_filter" + + " where c > 1 group by b", + anyTree(values(ImmutableList.of("b", "c"), + ImmutableList.of( + ImmutableList.of(new LongLiteral("1"), new LongLiteral("2")), + ImmutableList.of(new LongLiteral("1"), new LongLiteral("3")), + ImmutableList.of(new LongLiteral("2"), new LongLiteral("2")), + ImmutableList.of(new LongLiteral("2"), new LongLiteral("3")))))); + + // Another kind of metadata-enforced predicate which could not be push down, could do metadata optimization in such conditions as well + // Only support when do not enable `filter_push_down` + if (!Boolean.valueOf(enabled)) { + assertQuery(sessionWithOptimizeMetadataQueries, "select b, min(c), max(c) from test_with_metadata_enforced_filter" + + " where b + c > 2 and b + c < 5 group by b", + "values(1, 2, 3), (2, 1, 2)"); + assertPlan(sessionWithOptimizeMetadataQueries, "select b, min(c), max(c) from test_with_metadata_enforced_filter" + + " where b + c > 2 and b + c < 5 group by b", + anyTree(filter("b + c > 2 and b + c < 5", + anyTree(values(ImmutableList.of("b", "c"), + ImmutableList.of( + ImmutableList.of(new LongLiteral("1"), new LongLiteral("1")), + ImmutableList.of(new LongLiteral("1"), new LongLiteral("2")), + ImmutableList.of(new LongLiteral("1"), new LongLiteral("3")), + ImmutableList.of(new LongLiteral("2"), new LongLiteral("1")), + ImmutableList.of(new LongLiteral("2"), new LongLiteral("2")), + ImmutableList.of(new LongLiteral("2"), new LongLiteral("3")))))))); + } + else { + assertPlan(sessionWithOptimizeMetadataQueries, "select b, min(c), max(c) from test_with_metadata_enforced_filter" + + " where b + c > 2 and b + c < 5 group by b", + anyTree(strictTableScan("test_with_metadata_enforced_filter", identityMap("b", "c")))); + } + } + finally { + queryRunner.execute("DROP TABLE if exists test_with_metadata_enforced_filter"); + } + } + + @Test(dataProvider = "push_down_filter_enabled") + public void testMetadataOptimizationWithNonMetadataEnforcedPredicate(boolean enabled) + { + QueryRunner queryRunner = getQueryRunner(); + Session sessionWithOptimizeMetadataQueries = getSessionWithOptimizeMetadataQueries(enabled); + try { + queryRunner.execute("CREATE TABLE test_with_non_metadata_enforced_filter(a row(r1 varchar ,r2 int), b int, c int, d varchar) WITH (partitioning = ARRAY['b', 'c'])"); + queryRunner.execute("INSERT INTO test_with_non_metadata_enforced_filter VALUES" + + " (('1001', 1), 1, 1, 'd001'), (('1002', 2), 1, 1, 'd002'), (('1003', 3), 1, 1, 'd003')," + + " (('1004', 4), 1, 2, 'd004'), (('1005', 5), 1, 2, 'd005'), (('1006', 6), 1, 2, 'd006')," + + " (('1007', 7), 2, 1, 'd007'), (('1008', 8), 2, 1, 'd008'), (('1009', 9), 2, 1, 'd009')"); + + // Should not do metadata optimization when filtering by non-pushdown filter + if (!enabled) { + assertQuery(sessionWithOptimizeMetadataQueries, "select b, min(c), max(c) from test_with_non_metadata_enforced_filter" + + " where a.r1 >= '1003' and a.r1 <= '1007' group by b", + "values(1, 1, 2), (2, 1, 1)"); + assertPlan(sessionWithOptimizeMetadataQueries, "select b, min(c), max(c) from test_with_non_metadata_enforced_filter" + + " where a.r1 >= '1003' and a.r1 <= '1007' group by b", + anyTree(filter("a.r1 between '1003' and '1007'", + strictTableScan("test_with_non_metadata_enforced_filter", identityMap("a", "b", "c"))))); + + assertQuery(sessionWithOptimizeMetadataQueries, "select b, min(c), max(c) from test_with_non_metadata_enforced_filter" + + " where d >= 'd003' and d <= 'd007' group by b", + "values(1, 1, 2), (2, 1, 1)"); + assertPlan(sessionWithOptimizeMetadataQueries, "select b, min(c), max(c) from test_with_non_metadata_enforced_filter" + + " where d >= 'd003' and d <= 'd007' group by b", + anyTree(filter("d between 'd003' and 'd007'", + strictTableScan("test_with_non_metadata_enforced_filter", identityMap("d", "b", "c"))))); + } + else { + assertPlan(sessionWithOptimizeMetadataQueries, "select b, min(c), max(c) from test_with_non_metadata_enforced_filter" + + " where a.r1 >= '1003' and a.r1 <= '1007' group by b", + anyTree(strictTableScan("test_with_non_metadata_enforced_filter", identityMap("b", "c")))); + + assertPlan(sessionWithOptimizeMetadataQueries, "select b, min(c), max(c) from test_with_non_metadata_enforced_filter" + + " where d >= 'd003' and d <= 'd007' group by b", + anyTree(strictTableScan("test_with_non_metadata_enforced_filter", identityMap("b", "c")))); + } + } + finally { + queryRunner.execute("DROP TABLE if exists test_with_non_metadata_enforced_filter"); + } + } + + @Test(dataProvider = "push_down_filter_enabled") + public void testMetadataQueryOptimizerOnRowDelete(boolean enabled) + { + QueryRunner queryRunner = getQueryRunner(); + Session session = getSessionWithOptimizeMetadataQueries(enabled); + try { + queryRunner.execute("create table metadata_optimize_on_row_delete(v1 int, v2 varchar, a int, b varchar)" + + " with(partitioning = ARRAY['a', 'b'])"); + queryRunner.execute("insert into metadata_optimize_on_row_delete values" + + " (1, '1001', 1, '1001')," + + " (2, '1002', 2, '1001')," + + " (3, '1003', 3, '1002')," + + " (4, '1004', 4, '1002')"); + + assertUpdate("delete from metadata_optimize_on_row_delete where v1 >= 2", 3); + + // Only non-filterPushDown could run on Iceberg Java Connector + if (!Boolean.valueOf(enabled)) { + assertQuery(session, "select b, max(a), min(a) from metadata_optimize_on_row_delete group by b", + "values('1001', 1, 1)"); + assertQuery(session, "select distinct a, b from metadata_optimize_on_row_delete", + "values(1, '1001')"); + assertQuery(session, "select min(a), max(a), min(b), max(b) from metadata_optimize_on_row_delete", + "values(1, 1, '1001', '1001')"); + } + + // Skip metadata optimization when there existing delete files + assertPlan(session, "select b, max(a), min(a) from metadata_optimize_on_row_delete group by b", + anyTree(strictTableScan("metadata_optimize_on_row_delete", identityMap("a", "b")))); + assertPlan(session, "select distinct a, b from metadata_optimize_on_row_delete", + anyTree(strictTableScan("metadata_optimize_on_row_delete", identityMap("a", "b")))); + assertPlan(session, "select min(a), max(a), min(b), max(b) from metadata_optimize_on_row_delete", + anyTree(strictTableScan("metadata_optimize_on_row_delete", identityMap("a", "b")))); + } + finally { + queryRunner.execute("DROP TABLE IF EXISTS metadata_optimize_on_row_delete"); + } + } + + @Test(dataProvider = "push_down_filter_enabled") + public void testMetadataQueryOptimizerOnMetadataDelete(boolean enabled) + { + QueryRunner queryRunner = getQueryRunner(); + Session session = getSessionWithOptimizeMetadataQueries(enabled); + try { + queryRunner.execute("create table metadata_optimize_on_metadata_delete(v1 int, v2 varchar, a int, b varchar)" + + " with(partitioning = ARRAY['a', 'b'])"); + queryRunner.execute("insert into metadata_optimize_on_metadata_delete values" + + " (0, '1000', 0, '1001')," + + " (1, '1001', 1, '1001')," + + " (2, '1002', 2, '1001')," + + " (3, '1003', 3, '1002')," + + " (4, '1004', 4, '1002')"); + + assertUpdate("delete from metadata_optimize_on_metadata_delete where a >= 2", 3); + + // Do not affect metadata optimization on metadata delete + assertQuery(session, "select b, max(a), min(a) from metadata_optimize_on_metadata_delete group by b", + "values('1001', 1, 0)"); + assertPlan(session, "select b, max(a), min(a) from metadata_optimize_on_metadata_delete group by b", + anyTree(values( + ImmutableList.of("a", "b"), + ImmutableList.of( + ImmutableList.of(new LongLiteral("0"), new StringLiteral("1001")), + ImmutableList.of(new LongLiteral("1"), new StringLiteral("1001")))))); + + // Do not affect metadata optimization on metadata delete + assertQuery(session, "select distinct a, b from metadata_optimize_on_metadata_delete", + "values(1, '1001'), (0, '1001')"); + assertPlan(session, "select distinct a, b from metadata_optimize_on_metadata_delete", + anyTree(values( + ImmutableList.of("a", "b"), + ImmutableList.of( + ImmutableList.of(new LongLiteral("0"), new StringLiteral("1001")), + ImmutableList.of(new LongLiteral("1"), new StringLiteral("1001")))))); + + // Do not affect metadata optimization on metadata delete + assertQuery(session, "select min(a), max(b) from metadata_optimize_on_metadata_delete", "values(0, '1001')"); + assertPlan(session, "select min(a), max(b) from metadata_optimize_on_metadata_delete", + anyNot(AggregationNode.class, strictProject( + ImmutableMap.of("a", expression("0"), "b", expression("1001")), + anyTree(values())))); + } + finally { + queryRunner.execute("DROP TABLE IF EXISTS metadata_optimize_on_metadata_delete"); + } + } + + @Test(dataProvider = "push_down_filter_enabled") + public void testFilterByUnmatchedValue(boolean enabled) + { + Session session = getSessionWithOptimizeMetadataQueries(enabled); String tableName = "test_filter_by_unmatched_value"; assertUpdate("CREATE TABLE " + tableName + " (a varchar, b integer, r row(c int, d varchar)) WITH(partitioning = ARRAY['a'])"); // query with normal column filter on empty table - assertPlan(sessionWithFilterPushdown, "select a, r from " + tableName + " where b = 1001", + assertPlan(session, "select a, r from " + tableName + " where b = 1001", output(values("a", "r"))); // query with partition column filter on empty table - assertPlan(sessionWithFilterPushdown, "select b, r from " + tableName + " where a = 'var3'", + assertPlan(session, "select b, r from " + tableName + " where a = 'var3'", output(values("b", "r"))); assertUpdate("INSERT INTO " + tableName + " VALUES ('var1', 1, (1001, 't1')), ('var1', 3, (1003, 't3'))", 2); @@ -146,11 +528,11 @@ public void testFilterByUnmatchedValueWithFilterPushdown() assertUpdate("INSERT INTO " + tableName + " VALUES ('var1', 2, (1002, 't2')), ('var1', 9, (1009, 't9'))", 2); // query with unmatched normal column filter - assertPlan(sessionWithFilterPushdown, "select a, r from " + tableName + " where b = 1001", + assertPlan(session, "select a, r from " + tableName + " where b = 1001", output(values("a", "r"))); // query with unmatched partition column filter - assertPlan(sessionWithFilterPushdown, "select b, r from " + tableName + " where a = 'var3'", + assertPlan(session, "select b, r from " + tableName + " where a = 'var3'", output(values("b", "r"))); assertUpdate("DROP TABLE " + tableName); @@ -163,6 +545,7 @@ public void testFiltersWithPushdownDisable() Session sessionWithoutFilterPushdown = getQueryRunner().getDefaultSession(); assertUpdate("CREATE TABLE test_filters_with_pushdown_disable(id int, name varchar, r row(a int, b varchar)) with (partitioning = ARRAY['id'])"); + assertUpdate("INSERT INTO test_filters_with_pushdown_disable VALUES(10, 'adam', (10, 'adam')), (11, 'hd001', (11, 'hd001'))", 2); // Only identity partition column predicates, would be enforced totally by tableScan assertPlan(sessionWithoutFilterPushdown, "SELECT name, r FROM test_filters_with_pushdown_disable WHERE id = 10", @@ -207,7 +590,7 @@ public void testFiltersWithPushdownDisable() strictTableScan("test_filters_with_pushdown_disable", identityMap("name", "r"))))))); // Predicates expression `in` for identity partition columns could be enforced by iceberg table as well - assertPlan(sessionWithoutFilterPushdown, "SELECT id, name FROM test_filters_with_pushdown_disable WHERE id in (1, 3, 5, 7, 9) and r.b = 'adam'", + assertPlan(sessionWithoutFilterPushdown, "SELECT id, name FROM test_filters_with_pushdown_disable WHERE id in (1, 3, 5, 7, 9, 10) and r.b = 'adam'", output(exchange(project( filter("r.b='adam'", strictTableScan("test_filters_with_pushdown_disable", identityMap("id", "name", "r"))))))); @@ -221,6 +604,7 @@ public void testFiltersWithPushdownDisable() // Add a new identity partitioned column for iceberg table assertUpdate("ALTER TABLE test_filters_with_pushdown_disable add column newpart bigint with (partitioning = 'identity')"); + assertUpdate("INSERT INTO test_filters_with_pushdown_disable VALUES(10, 'newman', (10, 'newman'), 1001)", 1); // Predicates with originally present identity partition column and newly added identity partition column // Only the predicate on originally present identity partition column could be enforced by tableScan @@ -233,6 +617,7 @@ public void testFiltersWithPushdownDisable() assertUpdate("DROP TABLE test_filters_with_pushdown_disable"); assertUpdate("CREATE TABLE test_filters_with_pushdown_disable(id int, name varchar, r row(a int, b varchar)) with (partitioning = ARRAY['id', 'truncate(name, 2)'])"); + assertUpdate("INSERT INTO test_filters_with_pushdown_disable VALUES (10, 'hd001', (10, 'newman'))", 1); // Predicates with non-identity partitioned column could not be enforced by tableScan assertPlan(sessionWithoutFilterPushdown, "SELECT id FROM test_filters_with_pushdown_disable WHERE name = 'hd001'", @@ -267,6 +652,10 @@ public void testHourTransform(String zoneId, boolean legacyTimestamp) try { assertUpdate(session, "CREATE TABLE test_hour_transform (d TIMESTAMP, b BIGINT) WITH (partitioning = ARRAY['hour(d)'])"); + assertUpdate(session, "INSERT INTO test_hour_transform values" + + "(NULL, 0), " + + "(TIMESTAMP '1984-12-08 00:00:10', 1), " + + "(TIMESTAMP '2020-01-08 12:00:01', 2)", 3); assertPlan(session, "SELECT * FROM test_hour_transform WHERE d IS NOT NULL", thoroughlyPushdown(strictTableScan("test_hour_transform", identityMap("d", "b")))); @@ -1298,6 +1687,7 @@ public void testParquetDereferencePushDown() "x row(a bigint, b varchar, c double, d row(d1 bigint, d2 double))," + "y array(row(a bigint, b varchar, c double, d row(d1 bigint, d2 double)))) " + "with (format = 'PARQUET')"); + assertUpdate("INSERT INTO test_pushdown_nestedcolumn_parquet(id, x) VALUES(1, (11, 'abcd', 1.1, (1, 5.0)))", 1); assertParquetDereferencePushDown("SELECT x.a FROM test_pushdown_nestedcolumn_parquet", "test_pushdown_nestedcolumn_parquet", @@ -1816,6 +2206,13 @@ private Session sessionForTimezone(String zoneId, boolean legacyTimestamp) return sessionBuilder.build(); } + protected Session getSessionWithOptimizeMetadataQueries(boolean enabled) + { + return Session.builder(super.getSession()) + .setCatalogSessionProperty(ICEBERG_CATALOG, PUSHDOWN_FILTER_ENABLED, String.valueOf(enabled)) + .build(); + } + private static Set toSubfields(String... subfieldPaths) { return Arrays.stream(subfieldPaths) diff --git a/presto-main/src/main/java/com/facebook/presto/sql/relational/ConnectorRowExpressionService.java b/presto-main/src/main/java/com/facebook/presto/sql/relational/ConnectorRowExpressionService.java index 35590a73e5290..a8ab25fefc51f 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/relational/ConnectorRowExpressionService.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/relational/ConnectorRowExpressionService.java @@ -33,7 +33,12 @@ public final class ConnectorRowExpressionService private final DeterminismEvaluator determinismEvaluator; private final RowExpressionFormatter rowExpressionFormatter; - public ConnectorRowExpressionService(DomainTranslator domainTranslator, ExpressionOptimizer expressionOptimizer, PredicateCompiler predicateCompiler, DeterminismEvaluator determinismEvaluator, RowExpressionFormatter rowExpressionFormatter) + public ConnectorRowExpressionService( + DomainTranslator domainTranslator, + ExpressionOptimizer expressionOptimizer, + PredicateCompiler predicateCompiler, + DeterminismEvaluator determinismEvaluator, + RowExpressionFormatter rowExpressionFormatter) { this.domainTranslator = requireNonNull(domainTranslator, "domainTranslator is null"); this.expressionOptimizer = requireNonNull(expressionOptimizer, "expressionOptimizer is null"); diff --git a/presto-main/src/main/java/com/facebook/presto/sql/relational/FunctionResolution.java b/presto-main/src/main/java/com/facebook/presto/sql/relational/FunctionResolution.java index 7d7efada1cc8a..c680374e32e34 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/relational/FunctionResolution.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/relational/FunctionResolution.java @@ -298,6 +298,12 @@ public FunctionHandle maxFunction(Type valueType) return functionAndTypeResolver.lookupFunction("max", fromTypes(valueType)); } + @Override + public FunctionHandle greatestFunction(List valueTypes) + { + return functionAndTypeResolver.lookupFunction("greatest", fromTypes(valueTypes)); + } + @Override public boolean isMinFunction(FunctionHandle functionHandle) { @@ -310,6 +316,12 @@ public FunctionHandle minFunction(Type valueType) return functionAndTypeResolver.lookupFunction("min", fromTypes(valueType)); } + @Override + public FunctionHandle leastFunction(List valueTypes) + { + return functionAndTypeResolver.lookupFunction("least", fromTypes(valueTypes)); + } + @Override public boolean isApproximateCountDistinctFunction(FunctionHandle functionHandle) { diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/function/StandardFunctionResolution.java b/presto-spi/src/main/java/com/facebook/presto/spi/function/StandardFunctionResolution.java index 6f3e2956090cc..e425b7daf7bca 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/function/StandardFunctionResolution.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/function/StandardFunctionResolution.java @@ -70,10 +70,14 @@ public interface StandardFunctionResolution FunctionHandle maxFunction(Type valueType); + FunctionHandle greatestFunction(List valueTypes); + boolean isMinFunction(FunctionHandle functionHandle); FunctionHandle minFunction(Type valueType); + FunctionHandle leastFunction(List valueTypes); + boolean isApproximateCountDistinctFunction(FunctionHandle functionHandle); FunctionHandle approximateCountDistinctFunction(Type valueType);