diff --git a/presto-benchto-benchmarks/src/test/java/com/facebook/presto/sql/planner/AbstractCostBasedPlanTest.java b/presto-benchto-benchmarks/src/test/java/com/facebook/presto/sql/planner/AbstractCostBasedPlanTest.java
index 84c71f1111688..46ed546dcd607 100644
--- a/presto-benchto-benchmarks/src/test/java/com/facebook/presto/sql/planner/AbstractCostBasedPlanTest.java
+++ b/presto-benchto-benchmarks/src/test/java/com/facebook/presto/sql/planner/AbstractCostBasedPlanTest.java
@@ -16,6 +16,7 @@
import com.facebook.presto.spi.ConnectorTableHandle;
import com.facebook.presto.spi.plan.AggregationNode;
+import com.facebook.presto.spi.plan.JoinDistributionType;
import com.facebook.presto.spi.plan.TableScanNode;
import com.facebook.presto.spi.plan.ValuesNode;
import com.facebook.presto.sql.planner.assertions.BasePlanTest;
@@ -36,9 +37,9 @@
import java.nio.file.Paths;
import java.util.stream.Stream;
+import static com.facebook.presto.spi.plan.JoinDistributionType.REPLICATED;
+import static com.facebook.presto.spi.plan.JoinType.INNER;
import static com.facebook.presto.sql.Optimizer.PlanStage.OPTIMIZED_AND_VALIDATED;
-import static com.facebook.presto.sql.planner.plan.JoinNode.DistributionType.REPLICATED;
-import static com.facebook.presto.sql.planner.plan.JoinNode.Type.INNER;
import static com.facebook.presto.testing.TestngUtils.toDataProvider;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
@@ -157,7 +158,7 @@ public String result()
@Override
public Void visitJoin(JoinNode node, Integer indent)
{
- JoinNode.DistributionType distributionType = node.getDistributionType()
+ JoinDistributionType distributionType = node.getDistributionType()
.orElseThrow(() -> new VerifyException("Expected distribution type to be set"));
if (node.isCrossJoin()) {
checkState(node.getType() == INNER && distributionType == REPLICATED, "Expected CROSS JOIN to be INNER REPLICATED");
diff --git a/presto-docs/src/main/sphinx/connector/iceberg.rst b/presto-docs/src/main/sphinx/connector/iceberg.rst
index c5952654d5b8f..9de464f3e90a8 100644
--- a/presto-docs/src/main/sphinx/connector/iceberg.rst
+++ b/presto-docs/src/main/sphinx/connector/iceberg.rst
@@ -216,6 +216,9 @@ Property Name Description
``iceberg.enable-merge-on-read-mode`` Enable reading base tables that use merge-on-read for ``true``
updates.
+
+``iceberg.delete-as-join-rewrite-enabled`` When enabled, equality delete row filtering is applied ``true``
+ as a join with the data of the equality delete files.
================================================== ============================================================= ============
Table Properties
@@ -269,6 +272,18 @@ and a file system location of ``s3://test_bucket/test_schema/test_table``:
location = 's3://test_bucket/test_schema/test_table')
)
+Session Properties
+-------------------
+
+Session properties set behavior changes for queries executed within the given session.
+
+============================================= ======================================================================
+Property Name Description
+============================================= ======================================================================
+``iceberg.delete_as_join_rewrite_enabled`` Overrides the behavior of the connector property
+ ``iceberg.delete-as-join-rewrite-enabled`` in the current session.
+============================================= ======================================================================
+
Caching Support
----------------
@@ -373,6 +388,38 @@ Metastore cache only caches schema and table names. Other metadata would be fetc
hive.metastore-refresh-interval=3d
hive.metastore-cache-maximum-size=10000000
+Extra Hidden Metadata Columns
+----------------------------
+
+The Iceberg connector exposes extra hidden metadata columns. You can query these
+as part of a SQL query by including them in your SELECT statement.
+
+``$path`` column
+^^^^^^^^^^^^^^^^
+* ``$path``: Full file system path name of the file for this row
+.. code-block:: sql
+
+ SELECT "$path", regionkey FROM "ctas_nation";
+
+.. code-block:: text
+
+ $path | regionkey
+ ---------------------------------+-----------
+ /full/path/to/file/file.parquet | 2
+
+``$data_sequence_number`` column
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+* ``$data_sequence_number``: The Iceberg data sequence number in which this row was added
+.. code-block:: sql
+
+ SELECT "$data_sequence_number", regionkey FROM "ctas_nation";
+
+.. code-block:: text
+
+ $data_sequence_number | regionkey
+ ----------------------------------+------------
+ 2 | 3
+
Extra Hidden Metadata Tables
----------------------------
diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveHistoryBasedStatsTracking.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveHistoryBasedStatsTracking.java
index 5c25c2bca80c0..ffc7876a40531 100644
--- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveHistoryBasedStatsTracking.java
+++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveHistoryBasedStatsTracking.java
@@ -17,6 +17,7 @@
import com.facebook.presto.execution.SqlQueryManager;
import com.facebook.presto.spi.Plugin;
import com.facebook.presto.spi.plan.AggregationNode;
+import com.facebook.presto.spi.plan.JoinDistributionType;
import com.facebook.presto.spi.plan.ProjectNode;
import com.facebook.presto.spi.statistics.HistoryBasedPlanStatisticsProvider;
import com.facebook.presto.sql.planner.Plan;
@@ -131,7 +132,7 @@ public void testBroadcastJoin()
"(SELECT * FROM test_orders where ds = '2020-09-02' and substr(CAST(custkey AS VARCHAR), 1, 3) = '370') t2 ON t1.orderkey = t2.orderkey", defaultSession());
assertTrue(PlanNodeSearcher.searchFrom(plan.getRoot())
- .where(node -> node instanceof JoinNode && ((JoinNode) node).getDistributionType().get().equals(JoinNode.DistributionType.PARTITIONED))
+ .where(node -> node instanceof JoinNode && ((JoinNode) node).getDistributionType().get().equals(JoinDistributionType.PARTITIONED))
.findFirst()
.isPresent());
@@ -146,7 +147,7 @@ public void testBroadcastJoin()
"(SELECT * FROM test_orders where ds = '2020-09-02' and substr(CAST(custkey AS VARCHAR), 1, 3) = '370') t2 ON t1.orderkey = t2.orderkey", defaultSession());
assertTrue(PlanNodeSearcher.searchFrom(plan.getRoot())
- .where(node -> node instanceof JoinNode && ((JoinNode) node).getDistributionType().get().equals(JoinNode.DistributionType.REPLICATED))
+ .where(node -> node instanceof JoinNode && ((JoinNode) node).getDistributionType().get().equals(JoinDistributionType.REPLICATED))
.findFirst()
.isPresent());
}
diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveLogicalPlanner.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveLogicalPlanner.java
index e5bf1eb895352..f42aa4c1de014 100644
--- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveLogicalPlanner.java
+++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveLogicalPlanner.java
@@ -111,6 +111,7 @@
import static com.facebook.presto.hive.metastore.MetastoreUtil.toPartitionValues;
import static com.facebook.presto.hive.metastore.StorageFormat.fromHiveStorageFormat;
import static com.facebook.presto.parquet.ParquetTypeUtils.pushdownColumnNameForSubfield;
+import static com.facebook.presto.spi.plan.JoinType.INNER;
import static com.facebook.presto.sql.analyzer.TypeSignatureProvider.fromTypes;
import static com.facebook.presto.sql.planner.assertions.MatchResult.NO_MATCH;
import static com.facebook.presto.sql.planner.assertions.MatchResult.match;
@@ -133,7 +134,6 @@
import static com.facebook.presto.sql.planner.plan.ExchangeNode.Scope.LOCAL;
import static com.facebook.presto.sql.planner.plan.ExchangeNode.Scope.REMOTE_STREAMING;
import static com.facebook.presto.sql.planner.plan.ExchangeNode.Type.GATHER;
-import static com.facebook.presto.sql.planner.plan.JoinNode.Type.INNER;
import static com.facebook.presto.testing.assertions.Assert.assertEquals;
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.collect.ImmutableList.toImmutableList;
diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveMaterializedViewLogicalPlanner.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveMaterializedViewLogicalPlanner.java
index a4a82fe0a758f..8f1dbd7d3c2e8 100644
--- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveMaterializedViewLogicalPlanner.java
+++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveMaterializedViewLogicalPlanner.java
@@ -56,6 +56,8 @@
import static com.facebook.presto.hive.TestHiveLogicalPlanner.replicateHiveMetastore;
import static com.facebook.presto.hive.TestHiveLogicalPlanner.utf8Slices;
import static com.facebook.presto.spi.plan.AggregationNode.Step.SINGLE;
+import static com.facebook.presto.spi.plan.JoinType.INNER;
+import static com.facebook.presto.spi.plan.JoinType.LEFT;
import static com.facebook.presto.sql.analyzer.FeaturesConfig.JoinReorderingStrategy.ELIMINATE_CROSS_JOINS;
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.aggregation;
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.anyTree;
@@ -71,8 +73,6 @@
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.singleGroupingSet;
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.unnest;
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.values;
-import static com.facebook.presto.sql.planner.plan.JoinNode.Type.INNER;
-import static com.facebook.presto.sql.planner.plan.JoinNode.Type.LEFT;
import static com.facebook.presto.testing.TestingAccessControlManager.TestingPrivilegeType.INSERT_TABLE;
import static com.facebook.presto.testing.TestingAccessControlManager.TestingPrivilegeType.SELECT_COLUMN;
import static com.facebook.presto.testing.TestingAccessControlManager.privilege;
diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestMergeJoinPlan.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestMergeJoinPlan.java
index a7336feb82f8b..dc1a093fe1cdd 100644
--- a/presto-hive/src/test/java/com/facebook/presto/hive/TestMergeJoinPlan.java
+++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestMergeJoinPlan.java
@@ -14,8 +14,8 @@
package com.facebook.presto.hive;
import com.facebook.presto.Session;
+import com.facebook.presto.spi.plan.JoinType;
import com.facebook.presto.sql.planner.assertions.PlanMatchPattern;
-import com.facebook.presto.sql.planner.plan.JoinNode.Type;
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.AbstractTestQueryFramework;
import com.google.common.collect.ImmutableList;
@@ -29,15 +29,15 @@
import static com.facebook.presto.SystemSessionProperties.PREFER_MERGE_JOIN_FOR_SORTED_INPUTS;
import static com.facebook.presto.hive.HiveQueryRunner.HIVE_CATALOG;
import static com.facebook.presto.hive.HiveSessionProperties.ORDER_BASED_EXECUTION_ENABLED;
+import static com.facebook.presto.spi.plan.JoinDistributionType.PARTITIONED;
+import static com.facebook.presto.spi.plan.JoinType.FULL;
+import static com.facebook.presto.spi.plan.JoinType.INNER;
+import static com.facebook.presto.spi.plan.JoinType.LEFT;
+import static com.facebook.presto.spi.plan.JoinType.RIGHT;
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.anyTree;
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.equiJoinClause;
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.join;
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.mergeJoin;
-import static com.facebook.presto.sql.planner.plan.JoinNode.DistributionType.PARTITIONED;
-import static com.facebook.presto.sql.planner.plan.JoinNode.Type.FULL;
-import static com.facebook.presto.sql.planner.plan.JoinNode.Type.INNER;
-import static com.facebook.presto.sql.planner.plan.JoinNode.Type.LEFT;
-import static com.facebook.presto.sql.planner.plan.JoinNode.Type.RIGHT;
import static io.airlift.tpch.TpchTable.CUSTOMER;
import static io.airlift.tpch.TpchTable.LINE_ITEM;
import static io.airlift.tpch.TpchTable.NATION;
@@ -316,7 +316,7 @@ private Session mergeJoinEnabled()
.build();
}
- private PlanMatchPattern joinPlan(String leftTableName, String rightTableName, List leftJoinKeys, List rightJoinKeys, Type joinType, boolean mergeJoinEnabled)
+ private PlanMatchPattern joinPlan(String leftTableName, String rightTableName, List leftJoinKeys, List rightJoinKeys, JoinType joinType, boolean mergeJoinEnabled)
{
int suffix1 = 0;
int suffix2 = 1;
diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/ColumnIdentity.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/ColumnIdentity.java
index c03db26ad47dd..035cb560eb740 100644
--- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/ColumnIdentity.java
+++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/ColumnIdentity.java
@@ -121,10 +121,11 @@ public static ColumnIdentity primitiveColumnIdentity(int id, String name)
public static ColumnIdentity createColumnIdentity(Types.NestedField column)
{
- int id = column.fieldId();
- String name = column.name();
- org.apache.iceberg.types.Type fieldType = column.type();
+ return createColumnIdentity(column.name(), column.fieldId(), column.type());
+ }
+ public static ColumnIdentity createColumnIdentity(String name, int id, org.apache.iceberg.types.Type fieldType)
+ {
if (fieldType.equals(Types.TimestampType.withZone())) {
throw new UnsupportedOperationException(format("Iceberg column type %s is not supported", fieldType));
}
diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java
index fde213fa6eee3..8198a46a58a70 100644
--- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java
+++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java
@@ -103,8 +103,14 @@
import static com.facebook.presto.hive.MetadataUtils.getPredicate;
import static com.facebook.presto.hive.MetadataUtils.getSubfieldPredicate;
import static com.facebook.presto.iceberg.ExpressionConverter.toIcebergExpression;
+import static com.facebook.presto.iceberg.IcebergColumnHandle.DATA_SEQUENCE_NUMBER_COLUMN_HANDLE;
+import static com.facebook.presto.iceberg.IcebergColumnHandle.DATA_SEQUENCE_NUMBER_COLUMN_METADATA;
+import static com.facebook.presto.iceberg.IcebergColumnHandle.PATH_COLUMN_HANDLE;
+import static com.facebook.presto.iceberg.IcebergColumnHandle.PATH_COLUMN_METADATA;
import static com.facebook.presto.iceberg.IcebergColumnHandle.primitiveIcebergColumnHandle;
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_INVALID_SNAPSHOT_ID;
+import static com.facebook.presto.iceberg.IcebergMetadataColumn.DATA_SEQUENCE_NUMBER;
+import static com.facebook.presto.iceberg.IcebergMetadataColumn.FILE_PATH;
import static com.facebook.presto.iceberg.IcebergSessionProperties.isPushdownFilterEnabled;
import static com.facebook.presto.iceberg.IcebergTableProperties.FILE_FORMAT_PROPERTY;
import static com.facebook.presto.iceberg.IcebergTableProperties.FORMAT_VERSION;
@@ -112,6 +118,7 @@
import static com.facebook.presto.iceberg.IcebergTableProperties.PARTITIONING_PROPERTY;
import static com.facebook.presto.iceberg.IcebergTableType.CHANGELOG;
import static com.facebook.presto.iceberg.IcebergTableType.DATA;
+import static com.facebook.presto.iceberg.IcebergTableType.EQUALITY_DELETES;
import static com.facebook.presto.iceberg.IcebergUtil.getColumns;
import static com.facebook.presto.iceberg.IcebergUtil.getFileFormat;
import static com.facebook.presto.iceberg.IcebergUtil.getPartitionKeyColumnHandles;
@@ -138,7 +145,6 @@
import static java.lang.String.format;
import static java.util.Collections.singletonList;
import static java.util.Objects.requireNonNull;
-import static java.util.function.Function.identity;
public abstract class IcebergAbstractMetadata
implements ConnectorMetadata
@@ -334,11 +340,16 @@ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, Connect
protected ConnectorTableMetadata getTableMetadata(ConnectorSession session, SchemaTableName table, IcebergTableName icebergTableName)
{
Table icebergTable = getIcebergTable(session, new SchemaTableName(table.getSchemaName(), icebergTableName.getTableName()));
- List columns = getColumnMetadatas(icebergTable);
+ ImmutableList.Builder columns = ImmutableList.builder();
+ columns.addAll(getColumnMetadatas(icebergTable));
if (icebergTableName.getTableType() == CHANGELOG) {
- return ChangelogUtil.getChangelogTableMeta(table, typeManager, columns);
+ return ChangelogUtil.getChangelogTableMeta(table, typeManager, columns.build());
}
- return new ConnectorTableMetadata(table, columns, createMetadataProperties(icebergTable), getTableComment(icebergTable));
+ else {
+ columns.add(PATH_COLUMN_METADATA);
+ columns.add(DATA_SEQUENCE_NUMBER_COLUMN_METADATA);
+ }
+ return new ConnectorTableMetadata(table, columns.build(), createMetadataProperties(icebergTable), getTableComment(icebergTable));
}
@Override
@@ -632,8 +643,16 @@ public Map getColumnHandles(ConnectorSession session, Conn
else {
schema = icebergTable.schema();
}
- return getColumns(schema, icebergTable.spec(), typeManager).stream()
- .collect(toImmutableMap(IcebergColumnHandle::getName, identity()));
+
+ ImmutableMap.Builder columnHandles = ImmutableMap.builder();
+ for (IcebergColumnHandle columnHandle : getColumns(schema, icebergTable.spec(), typeManager)) {
+ columnHandles.put(columnHandle.getName(), columnHandle);
+ }
+ if (table.getIcebergTableName().getTableType() != CHANGELOG) {
+ columnHandles.put(FILE_PATH.getColumnName(), PATH_COLUMN_HANDLE);
+ columnHandles.put(DATA_SEQUENCE_NUMBER.getColumnName(), DATA_SEQUENCE_NUMBER_COLUMN_HANDLE);
+ }
+ return columnHandles.build();
}
@Override
@@ -654,7 +673,7 @@ public IcebergTableHandle getTableHandle(ConnectorSession session, SchemaTableNa
public IcebergTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName, Optional tableVersion)
{
IcebergTableName name = IcebergTableName.from(tableName.getTableName());
- verify(name.getTableType() == DATA || name.getTableType() == CHANGELOG, "Wrong table type: " + name.getTableType());
+ verify(name.getTableType() == DATA || name.getTableType() == CHANGELOG || name.getTableType() == EQUALITY_DELETES, "Wrong table type: " + name.getTableType());
if (!tableExists(session, tableName)) {
return null;
@@ -680,7 +699,9 @@ public IcebergTableHandle getTableHandle(ConnectorSession session, SchemaTableNa
new IcebergTableName(name.getTableName(), name.getTableType(), tableSnapshotId, name.getChangelogEndSnapshot()),
name.getSnapshotId().isPresent(),
TupleDomain.all(),
- tableSchemaJson);
+ tableSchemaJson,
+ Optional.empty(),
+ Optional.empty());
}
@Override
diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergColumnHandle.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergColumnHandle.java
index 14a410db75e38..9c74f8723cdf1 100644
--- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergColumnHandle.java
+++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergColumnHandle.java
@@ -18,6 +18,7 @@
import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.hive.BaseHiveColumnHandle;
import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ColumnMetadata;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
@@ -32,6 +33,8 @@
import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.SYNTHESIZED;
import static com.facebook.presto.iceberg.ColumnIdentity.createColumnIdentity;
import static com.facebook.presto.iceberg.ColumnIdentity.primitiveColumnIdentity;
+import static com.facebook.presto.iceberg.IcebergMetadataColumn.DATA_SEQUENCE_NUMBER;
+import static com.facebook.presto.iceberg.IcebergMetadataColumn.FILE_PATH;
import static com.facebook.presto.iceberg.TypeConverter.toPrestoType;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.Iterables.getOnlyElement;
@@ -42,6 +45,11 @@
public class IcebergColumnHandle
extends BaseHiveColumnHandle
{
+ public static final IcebergColumnHandle PATH_COLUMN_HANDLE = getIcebergColumnHandle(FILE_PATH);
+ public static final ColumnMetadata PATH_COLUMN_METADATA = getColumnMetadata(FILE_PATH);
+ public static final IcebergColumnHandle DATA_SEQUENCE_NUMBER_COLUMN_HANDLE = getIcebergColumnHandle(DATA_SEQUENCE_NUMBER);
+ public static final ColumnMetadata DATA_SEQUENCE_NUMBER_COLUMN_METADATA = getColumnMetadata(DATA_SEQUENCE_NUMBER);
+
private final ColumnIdentity columnIdentity;
private final Type type;
@@ -132,6 +140,39 @@ public String toString()
return getId() + ":" + getName() + ":" + type.getDisplayName() + ":" + getColumnType() + ":" + getRequiredSubfields();
}
+ private static IcebergColumnHandle getIcebergColumnHandle(IcebergMetadataColumn metadataColumn)
+ {
+ return new IcebergColumnHandle(
+ columIdentity(metadataColumn),
+ metadataColumn.getType(),
+ Optional.empty(),
+ SYNTHESIZED);
+ }
+
+ private static ColumnMetadata getColumnMetadata(IcebergMetadataColumn metadataColumn)
+ {
+ return ColumnMetadata.builder()
+ .setName(metadataColumn.getColumnName())
+ .setType(metadataColumn.getType())
+ .setHidden(true)
+ .build();
+ }
+
+ private static ColumnIdentity columIdentity(IcebergMetadataColumn metadata)
+ {
+ return new ColumnIdentity(metadata.getId(), metadata.getColumnName(), metadata.getTypeCategory(), ImmutableList.of());
+ }
+
+ public boolean isPathColumn()
+ {
+ return getColumnIdentity().getId() == FILE_PATH.getId();
+ }
+
+ public boolean isDataSequenceNumberColumn()
+ {
+ return getColumnIdentity().getId() == DATA_SEQUENCE_NUMBER.getId();
+ }
+
public static IcebergColumnHandle primitiveIcebergColumnHandle(int id, String name, Type type, Optional comment)
{
return new IcebergColumnHandle(primitiveColumnIdentity(id, name), type, comment, REGULAR);
diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConfig.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConfig.java
index 4dca155eef238..6f8d88f044739 100644
--- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConfig.java
+++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConfig.java
@@ -50,6 +50,7 @@ public class IcebergConfig
private boolean mergeOnReadModeEnabled = true;
private double statisticSnapshotRecordDifferenceWeight;
private boolean pushdownFilterEnabled;
+ private boolean deleteAsJoinRewriteEnabled = true;
private HiveStatisticsMergeStrategy hiveStatisticsMergeStrategy = HiveStatisticsMergeStrategy.NONE;
private String fileIOImpl = HadoopFileIO.class.getName();
@@ -235,6 +236,19 @@ public boolean isPushdownFilterEnabled()
return pushdownFilterEnabled;
}
+ @Config("iceberg.delete-as-join-rewrite-enabled")
+ @ConfigDescription("When enabled, equality delete row filtering will be implemented by rewriting the query plan to join with the delete keys.")
+ public IcebergConfig setDeleteAsJoinRewriteEnabled(boolean deleteAsJoinPushdownEnabled)
+ {
+ this.deleteAsJoinRewriteEnabled = deleteAsJoinPushdownEnabled;
+ return this;
+ }
+
+ public boolean isDeleteAsJoinRewriteEnabled()
+ {
+ return deleteAsJoinRewriteEnabled;
+ }
+
public boolean getManifestCachingEnabled()
{
return manifestCachingEnabled;
diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergMetadataColumn.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergMetadataColumn.java
new file mode 100644
index 0000000000000..a3e3d3e258cb1
--- /dev/null
+++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergMetadataColumn.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.iceberg;
+
+import com.facebook.presto.common.type.Type;
+import org.apache.iceberg.MetadataColumns;
+
+import java.util.Set;
+import java.util.stream.Stream;
+
+import static com.facebook.presto.common.type.BigintType.BIGINT;
+import static com.facebook.presto.common.type.VarcharType.VARCHAR;
+import static com.facebook.presto.iceberg.ColumnIdentity.TypeCategory.PRIMITIVE;
+import static com.google.common.collect.ImmutableSet.toImmutableSet;
+
+public enum IcebergMetadataColumn
+{
+ FILE_PATH(MetadataColumns.FILE_PATH.fieldId(), "$path", VARCHAR, PRIMITIVE),
+ DATA_SEQUENCE_NUMBER(Integer.MAX_VALUE - 1001, "$data_sequence_number", BIGINT, PRIMITIVE),
+ /**/;
+
+ private static final Set COLUMN_IDS = Stream.of(values())
+ .map(IcebergMetadataColumn::getId)
+ .collect(toImmutableSet());
+ private final int id;
+ private final String columnName;
+ private final Type type;
+ private final ColumnIdentity.TypeCategory typeCategory;
+
+ IcebergMetadataColumn(int id, String columnName, Type type, ColumnIdentity.TypeCategory typeCategory)
+ {
+ this.id = id;
+ this.columnName = columnName;
+ this.type = type;
+ this.typeCategory = typeCategory;
+ }
+
+ public int getId()
+ {
+ return id;
+ }
+
+ public String getColumnName()
+ {
+ return columnName;
+ }
+
+ public Type getType()
+ {
+ return type;
+ }
+
+ public ColumnIdentity.TypeCategory getTypeCategory()
+ {
+ return typeCategory;
+ }
+
+ public static boolean isMetadataColumnId(int id)
+ {
+ return COLUMN_IDS.contains(id);
+ }
+}
diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSource.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSource.java
index 2567fda2d1927..ea6b958de3fa0 100644
--- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSource.java
+++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSource.java
@@ -32,6 +32,7 @@
import java.util.Optional;
import java.util.function.Supplier;
+import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.PARTITION_KEY;
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_BAD_DATA;
import static com.facebook.presto.iceberg.IcebergUtil.deserializePartitionValue;
import static com.google.common.base.Throwables.throwIfInstanceOf;
@@ -49,6 +50,7 @@ public class IcebergPageSource
public IcebergPageSource(
List columns,
+ Map metadataValues,
Map partitionKeys,
ConnectorPageSource delegate,
Supplier> deletePredicate)
@@ -72,6 +74,16 @@ public IcebergPageSource(
prefilledBlocks[outputIndex] = nativeValueToBlock(type, prefilledValue);
delegateIndexes[outputIndex] = -1;
}
+ else if (column.getColumnType() == PARTITION_KEY) {
+ // Partition key with no value. This can happen after partition evolution
+ Type type = column.getType();
+ prefilledBlocks[outputIndex] = nativeValueToBlock(type, null);
+ delegateIndexes[outputIndex] = -1;
+ }
+ else if (IcebergMetadataColumn.isMetadataColumnId(column.getId())) {
+ prefilledBlocks[outputIndex] = nativeValueToBlock(column.getType(), metadataValues.get(column.getColumnIdentity().getId()));
+ delegateIndexes[outputIndex] = -1;
+ }
else {
delegateIndexes[outputIndex] = delegateIndex;
delegateIndex++;
diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSourceProvider.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSourceProvider.java
index 84c46b5c2d18e..faa417dd0e13b 100644
--- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSourceProvider.java
+++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSourceProvider.java
@@ -107,6 +107,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -118,6 +119,7 @@
import java.util.stream.IntStream;
import static com.facebook.presto.common.type.VarcharType.VARCHAR;
+import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.PARTITION_KEY;
import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.REGULAR;
import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.SYNTHESIZED;
import static com.facebook.presto.hive.CacheQuota.NO_CACHE_CONSTRAINTS;
@@ -724,13 +726,17 @@ public ConnectorPageSource createPageSource(
List regularColumns = columns.stream()
.map(IcebergColumnHandle.class::cast)
- .filter(column -> !partitionKeys.containsKey(column.getId()))
+ .filter(column -> column.getColumnType() != PARTITION_KEY &&
+ !partitionKeys.containsKey(column.getId()) &&
+ !IcebergMetadataColumn.isMetadataColumnId(column.getId()))
.collect(Collectors.toList());
Optional tableSchemaJson = table.getTableSchemaJson();
verify(tableSchemaJson.isPresent(), "tableSchemaJson is null");
Schema tableSchema = SchemaParser.fromJson(tableSchemaJson.get());
- Set deleteFilterRequiredColumns = requiredColumnsForDeletes(tableSchema, split.getDeletes());
+
+ boolean equalityDeletesRequired = table.getIcebergTableName().getTableType() == IcebergTableType.DATA;
+ Set deleteFilterRequiredColumns = requiredColumnsForDeletes(tableSchema, split.getDeletes(), equalityDeletesRequired);
deleteFilterRequiredColumns.stream()
.filter(not(icebergColumns::contains))
@@ -751,11 +757,17 @@ public ConnectorPageSource createPageSource(
ConnectorPageSource dataPageSource = connectorPageSourceWithRowPositions.getConnectorPageSource();
Supplier> deletePredicate = Suppliers.memoize(() -> {
+ // If equality deletes are optimized into a join they don't need to be applied here
+ List deletesToApply = split
+ .getDeletes()
+ .stream()
+ .filter(deleteFile -> deleteFile.content() == POSITION_DELETES || equalityDeletesRequired)
+ .collect(toImmutableList());
List deleteFilters = readDeletes(
session,
tableSchema,
split.getPath(),
- split.getDeletes(),
+ deletesToApply,
connectorPageSourceWithRowPositions.getStartRowPosition(),
connectorPageSourceWithRowPositions.getEndRowPosition());
return deleteFilters.stream()
@@ -763,21 +775,31 @@ public ConnectorPageSource createPageSource(
.reduce(RowPredicate::and);
});
- ConnectorPageSource dataSource = new IcebergPageSource(icebergColumns, partitionKeys, dataPageSource, deletePredicate);
+ HashMap metadataValues = new HashMap<>();
+ for (IcebergColumnHandle icebergColumn : icebergColumns) {
+ if (icebergColumn.isPathColumn()) {
+ metadataValues.put(icebergColumn.getColumnIdentity().getId(), utf8Slice(split.getPath()));
+ }
+ else if (icebergColumn.isDataSequenceNumberColumn()) {
+ metadataValues.put(icebergColumn.getColumnIdentity().getId(), split.getDataSequenceNumber());
+ }
+ }
+
+ ConnectorPageSource dataSource = new IcebergPageSource(icebergColumns, metadataValues, partitionKeys, dataPageSource, deletePredicate);
if (split.getChangelogSplitInfo().isPresent()) {
dataSource = new ChangelogPageSource(dataSource, split.getChangelogSplitInfo().get(), (List) (List>) desiredColumns, icebergColumns);
}
return dataSource;
}
- private Set requiredColumnsForDeletes(Schema schema, List deletes)
+ private Set requiredColumnsForDeletes(Schema schema, List deletes, boolean equalityDeletesRequired)
{
ImmutableSet.Builder requiredColumns = ImmutableSet.builder();
for (DeleteFile deleteFile : deletes) {
if (deleteFile.content() == POSITION_DELETES) {
requiredColumns.add(IcebergColumnHandle.create(ROW_POSITION, typeManager, IcebergColumnHandle.ColumnType.REGULAR));
}
- else if (deleteFile.content() == EQUALITY_DELETES) {
+ else if (deleteFile.content() == EQUALITY_DELETES && equalityDeletesRequired) {
deleteFile.equalityFieldIds().stream()
.map(id -> IcebergColumnHandle.create(schema.findField(id), typeManager, IcebergColumnHandle.ColumnType.REGULAR))
.forEach(requiredColumns::add);
diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSessionProperties.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSessionProperties.java
index 5531efd425336..fe4c51aa75492 100644
--- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSessionProperties.java
+++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSessionProperties.java
@@ -55,6 +55,7 @@ public final class IcebergSessionProperties
public static final String PARQUET_DEREFERENCE_PUSHDOWN_ENABLED = "parquet_dereference_pushdown_enabled";
public static final String MERGE_ON_READ_MODE_ENABLED = "merge_on_read_enabled";
public static final String PUSHDOWN_FILTER_ENABLED = "pushdown_filter_enabled";
+ public static final String DELETE_AS_JOIN_REWRITE_ENABLED = "delete_as_join_rewrite_enabled";
public static final String HIVE_METASTORE_STATISTICS_MERGE_STRATEGY = "hive_statistics_merge_strategy";
public static final String STATISTIC_SNAPSHOT_RECORD_DIFFERENCE_WEIGHT = "statistic_snapshot_record_difference_weight";
@@ -175,6 +176,11 @@ public IcebergSessionProperties(
"value of 1 means a single record is equivalent to 1 millisecond of " +
"time difference.",
icebergConfig.getStatisticSnapshotRecordDifferenceWeight(),
+ false),
+ booleanProperty(
+ DELETE_AS_JOIN_REWRITE_ENABLED,
+ "When enabled equality delete row filtering will be pushed down into a join.",
+ icebergConfig.isDeleteAsJoinRewriteEnabled(),
false));
}
@@ -280,4 +286,9 @@ public static double getStatisticSnapshotRecordDifferenceWeight(ConnectorSession
{
return session.getProperty(STATISTIC_SNAPSHOT_RECORD_DIFFERENCE_WEIGHT, Double.class);
}
+
+ public static boolean isDeleteToJoinPushdownEnabled(ConnectorSession session)
+ {
+ return session.getProperty(DELETE_AS_JOIN_REWRITE_ENABLED, Boolean.class);
+ }
}
diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplit.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplit.java
index 013b9fe3c9927..bf807512272ab 100644
--- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplit.java
+++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplit.java
@@ -49,6 +49,7 @@ public class IcebergSplit
private final SplitWeight splitWeight;
private final List deletes;
private final Optional changelogSplitInfo;
+ private final long dataSequenceNumber;
@JsonCreator
public IcebergSplit(
@@ -61,7 +62,8 @@ public IcebergSplit(
@JsonProperty("nodeSelectionStrategy") NodeSelectionStrategy nodeSelectionStrategy,
@JsonProperty("splitWeight") SplitWeight splitWeight,
@JsonProperty("deletes") List deletes,
- @JsonProperty("changelogSplitInfo") Optional changelogSplitInfo)
+ @JsonProperty("changelogSplitInfo") Optional changelogSplitInfo,
+ @JsonProperty("dataSequenceNumber") long dataSequenceNumber)
{
requireNonNull(nodeSelectionStrategy, "nodeSelectionStrategy is null");
this.path = requireNonNull(path, "path is null");
@@ -74,6 +76,7 @@ public IcebergSplit(
this.splitWeight = requireNonNull(splitWeight, "splitWeight is null");
this.deletes = ImmutableList.copyOf(requireNonNull(deletes, "deletes is null"));
this.changelogSplitInfo = requireNonNull(changelogSplitInfo, "changelogSplitInfo is null");
+ this.dataSequenceNumber = dataSequenceNumber;
}
@JsonProperty
@@ -147,6 +150,12 @@ public Optional getChangelogSplitInfo()
return changelogSplitInfo;
}
+ @JsonProperty
+ public long getDataSequenceNumber()
+ {
+ return dataSequenceNumber;
+ }
+
@Override
public Object getInfo()
{
diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitManager.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitManager.java
index b55196dc2ca8b..e9ed7f0454de3 100644
--- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitManager.java
+++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitManager.java
@@ -16,6 +16,7 @@
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.iceberg.changelog.ChangelogSplitSource;
+import com.facebook.presto.iceberg.equalitydeletes.EqualityDeletesSplitSource;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorSplitSource;
import com.facebook.presto.spi.ConnectorTableLayoutHandle;
@@ -23,9 +24,11 @@
import com.facebook.presto.spi.connector.ConnectorSplitManager;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import com.google.common.collect.ImmutableList;
+import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.IncrementalChangelogScan;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
+import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.TableScanUtil;
@@ -36,6 +39,7 @@
import static com.facebook.presto.iceberg.IcebergSessionProperties.getMinimumAssignedSplitWeight;
import static com.facebook.presto.iceberg.IcebergSessionProperties.isPushdownFilterEnabled;
import static com.facebook.presto.iceberg.IcebergTableType.CHANGELOG;
+import static com.facebook.presto.iceberg.IcebergTableType.EQUALITY_DELETES;
import static com.facebook.presto.iceberg.IcebergUtil.getIcebergTable;
import static java.util.Objects.requireNonNull;
@@ -86,6 +90,15 @@ public ConnectorSplitSource getSplits(
.toSnapshot(toSnapshot);
return new ChangelogSplitSource(session, typeManager, icebergTable, scan, scan.targetSplitSize());
}
+ else if (table.getIcebergTableName().getTableType() == EQUALITY_DELETES) {
+ CloseableIterable deleteFiles = IcebergUtil.getDeleteFiles(icebergTable,
+ table.getIcebergTableName().getSnapshotId().get(),
+ table.getPredicate(),
+ table.getPartitionSpecId(),
+ table.getEqualityFieldIds());
+
+ return new EqualityDeletesSplitSource(session, icebergTable, deleteFiles);
+ }
else {
TableScan tableScan = icebergTable.newScan()
.filter(toIcebergExpression(predicate))
diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitSource.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitSource.java
index f901ec1f6bdd9..b908d9ef7c80a 100644
--- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitSource.java
+++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitSource.java
@@ -35,6 +35,7 @@
import java.util.concurrent.CompletableFuture;
import static com.facebook.presto.hive.HiveCommonSessionProperties.getNodeSelectionStrategy;
+import static com.facebook.presto.iceberg.IcebergUtil.getDataSequenceNumber;
import static com.facebook.presto.iceberg.IcebergUtil.getPartitionKeys;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Iterators.limit;
@@ -114,6 +115,7 @@ private ConnectorSplit toIcebergSplit(FileScanTask task)
getNodeSelectionStrategy(session),
SplitWeight.fromProportion(Math.min(Math.max((double) task.length() / tableScan.targetSplitSize(), minimumAssignedSplitWeight), 1.0)),
task.deletes().stream().map(DeleteFile::fromIceberg).collect(toImmutableList()),
- Optional.empty());
+ Optional.empty(),
+ getDataSequenceNumber(task.file()));
}
}
diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableHandle.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableHandle.java
index e411e0ef72238..6c6be167e2faa 100644
--- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableHandle.java
+++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableHandle.java
@@ -20,6 +20,7 @@
import java.util.Objects;
import java.util.Optional;
+import java.util.Set;
import static java.util.Objects.requireNonNull;
@@ -30,6 +31,8 @@ public class IcebergTableHandle
private final TupleDomain predicate;
private final boolean snapshotSpecified;
private final Optional tableSchemaJson;
+ private final Optional> partitionFieldIds;
+ private final Optional> equalityFieldIds;
@JsonCreator
public IcebergTableHandle(
@@ -37,7 +40,9 @@ public IcebergTableHandle(
@JsonProperty("icebergTableName") IcebergTableName icebergTableName,
@JsonProperty("snapshotSpecified") boolean snapshotSpecified,
@JsonProperty("predicate") TupleDomain predicate,
- @JsonProperty("tableSchemaJson") Optional tableSchemaJson)
+ @JsonProperty("tableSchemaJson") Optional tableSchemaJson,
+ @JsonProperty("partitionFieldIds") Optional> partitionFieldIds,
+ @JsonProperty("equalityFieldIds") Optional> equalityFieldIds)
{
super(schemaName, icebergTableName.getTableName());
@@ -45,6 +50,8 @@ public IcebergTableHandle(
this.snapshotSpecified = snapshotSpecified;
this.predicate = requireNonNull(predicate, "predicate is null");
this.tableSchemaJson = requireNonNull(tableSchemaJson, "tableSchemaJson is null");
+ this.partitionFieldIds = requireNonNull(partitionFieldIds, "partitionFieldIds is null");
+ this.equalityFieldIds = requireNonNull(equalityFieldIds, "equalityFieldIds is null");
}
@JsonProperty
@@ -71,6 +78,18 @@ public Optional getTableSchemaJson()
return tableSchemaJson;
}
+ @JsonProperty
+ public Optional> getPartitionSpecId()
+ {
+ return partitionFieldIds;
+ }
+
+ @JsonProperty
+ public Optional> getEqualityFieldIds()
+ {
+ return equalityFieldIds;
+ }
+
@Override
public boolean equals(Object o)
{
@@ -86,13 +105,14 @@ public boolean equals(Object o)
Objects.equals(icebergTableName, that.icebergTableName) &&
snapshotSpecified == that.snapshotSpecified &&
Objects.equals(predicate, that.predicate) &&
- Objects.equals(tableSchemaJson, that.tableSchemaJson);
+ Objects.equals(tableSchemaJson, that.tableSchemaJson) &&
+ Objects.equals(equalityFieldIds, that.equalityFieldIds);
}
@Override
public int hashCode()
{
- return Objects.hash(getSchemaName(), icebergTableName, predicate, snapshotSpecified, tableSchemaJson);
+ return Objects.hash(getSchemaName(), icebergTableName, predicate, snapshotSpecified, tableSchemaJson, equalityFieldIds);
}
@Override
diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableName.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableName.java
index cbaabe3011694..87ec01e3c037d 100644
--- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableName.java
+++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableName.java
@@ -127,6 +127,10 @@ public static IcebergTableName from(String name)
}
}
+ if (!type.isPublic()) {
+ throw new PrestoException(NOT_SUPPORTED, format("Internal Iceberg table name (type '%s'): %s", typeString, name));
+ }
+
Optional version = Optional.empty();
Optional changelogEndVersion = Optional.empty();
if (type == DATA || type == PARTITIONS || type == MANIFESTS || type == FILES) {
diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableType.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableType.java
index 853c38fa48a49..cd46f344f359a 100644
--- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableType.java
+++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableType.java
@@ -15,12 +15,26 @@
public enum IcebergTableType
{
- DATA,
- HISTORY,
- SNAPSHOTS,
- MANIFESTS,
- PARTITIONS,
- FILES,
- PROPERTIES,
- CHANGELOG
+ DATA(true),
+ HISTORY(true),
+ SNAPSHOTS(true),
+ MANIFESTS(true),
+ PARTITIONS(true),
+ FILES(true),
+ PROPERTIES(true),
+ CHANGELOG(true),
+ EQUALITY_DELETES(true),
+ DATA_WITHOUT_EQUALITY_DELETES(false);
+
+ private final boolean isPublic;
+
+ IcebergTableType(boolean isPublic)
+ {
+ this.isPublic = isPublic;
+ }
+
+ public boolean isPublic()
+ {
+ return isPublic;
+ }
}
diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java
index 989baec687622..adb8db329c851 100644
--- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java
+++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java
@@ -43,9 +43,13 @@
import com.google.common.base.VerifyException;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.ContentFile;
import org.apache.iceberg.ContentScanTask;
import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.HistoryEntry;
@@ -62,6 +66,7 @@
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.hive.HiveSchemaUtil;
import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.NestedField;
@@ -78,6 +83,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@@ -86,6 +92,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.regex.Pattern;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
import static com.facebook.presto.common.type.BigintType.BIGINT;
@@ -147,6 +154,7 @@
import static java.lang.Long.parseLong;
import static java.lang.Math.toIntExact;
import static java.lang.String.format;
+import static java.util.Collections.emptyIterator;
import static java.util.Comparator.comparing;
import static org.apache.iceberg.BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE;
import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP;
@@ -710,18 +718,24 @@ public static Map getPartitionKeys(ContentScanTask fieldToIndex = getIdentityPartitions(spec);
+ return getPartitionKeys(spec, partition);
+ }
+
+ public static Map getPartitionKeys(PartitionSpec spec, StructLike partition)
+ {
Map partitionKeys = new HashMap<>();
- fieldToIndex.forEach((field, index) -> {
- int id = field.sourceId();
+ int index = 0;
+ for (PartitionField field : spec.fields()) {
+ int sourceId = field.sourceId();
String colName = field.name();
- org.apache.iceberg.types.Type type = spec.schema().findType(id);
+ org.apache.iceberg.types.Type sourceType = spec.schema().findType(sourceId);
+ org.apache.iceberg.types.Type type = field.transform().getResultType(sourceType);
Class> javaClass = type.typeId().javaClass();
Object value = partition.get(index, javaClass);
if (value == null) {
- partitionKeys.put(id, new HivePartitionKey(colName, Optional.empty()));
+ partitionKeys.put(field.fieldId(), new HivePartitionKey(colName, Optional.empty()));
}
else {
HivePartitionKey partitionValue;
@@ -732,9 +746,13 @@ public static Map getPartitionKeys(ContentScanTask properties, Iceberg
properties.put(IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH, String.valueOf(icebergConfig.getManifestCacheMaxContentLength()));
properties.put(IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS, String.valueOf(icebergConfig.getManifestCacheExpireDuration()));
}
+
+ public static long getDataSequenceNumber(ContentFile> file)
+ {
+ if (file.dataSequenceNumber() != null) {
+ return file.dataSequenceNumber();
+ }
+ return file.fileSequenceNumber();
+ }
+
+ /**
+ * Provides the delete files that need to be applied to the given table snapshot.
+ *
+ * @param table The table to provide deletes for
+ * @param snapshot The snapshot id to use
+ * @param filter Filters to apply during planning
+ * @param requestedPartitionSpec If provided, only delete files for this partition spec will be provided
+ * @param requestedSchema If provided, only delete files with this schema will be provided
+ */
+ public static CloseableIterable getDeleteFiles(Table table,
+ long snapshot,
+ TupleDomain filter,
+ Optional> requestedPartitionSpec,
+ Optional> requestedSchema)
+ {
+ Expression filterExpression = toIcebergExpression(filter);
+ CloseableIterable fileTasks = table.newScan().useSnapshot(snapshot).filter(filterExpression).planFiles();
+
+ return new CloseableIterable()
+ {
+ @Override
+ public void close()
+ throws IOException
+ {
+ fileTasks.close();
+ }
+
+ @Override
+ public CloseableIterator iterator()
+ {
+ return new DeleteFilesIterator(table.specs(), fileTasks.iterator(), requestedPartitionSpec, requestedSchema);
+ }
+ };
+ }
+
+ private static class DeleteFilesIterator
+ implements CloseableIterator
+ {
+ private final Set seenFiles = new HashSet<>();
+ private final Map partitionSpecsById;
+ private CloseableIterator fileTasks;
+ private final Optional> requestedPartitionSpec;
+ private final Optional> requestedSchema;
+ private Iterator currentDeletes = emptyIterator();
+ private DeleteFile currentFile;
+
+ private DeleteFilesIterator(Map partitionSpecsById,
+ CloseableIterator fileTasks,
+ Optional> requestedPartitionSpec,
+ Optional> requestedSchema)
+ {
+ this.partitionSpecsById = partitionSpecsById;
+ this.fileTasks = fileTasks;
+ this.requestedPartitionSpec = requestedPartitionSpec;
+ this.requestedSchema = requestedSchema;
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ return currentFile != null || advance();
+ }
+
+ private boolean advance()
+ {
+ currentFile = null;
+ while (currentFile == null && (currentDeletes.hasNext() || fileTasks.hasNext())) {
+ if (!currentDeletes.hasNext()) {
+ currentDeletes = fileTasks.next().deletes().iterator();
+ }
+ while (currentDeletes.hasNext()) {
+ DeleteFile deleteFile = currentDeletes.next();
+ if (shouldIncludeFile(deleteFile)) {
+ // If there is a requested schema only include files that match it
+ if (seenFiles.add(deleteFile.path().toString())) {
+ currentFile = deleteFile;
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public DeleteFile next()
+ {
+ DeleteFile result = currentFile;
+ advance();
+ return result;
+ }
+
+ private boolean shouldIncludeFile(DeleteFile file)
+ {
+ boolean matchesPartition = !requestedPartitionSpec.isPresent() ||
+ requestedPartitionSpec.get().equals(partitionSpecsById.get(file.specId()).fields().stream().map(PartitionField::fieldId).collect(Collectors.toSet()));
+ return matchesPartition && (file.content() == FileContent.POSITION_DELETES ||
+ !requestedSchema.isPresent() || requestedSchema.get().equals(ImmutableSet.copyOf(file.equalityFieldIds())));
+ }
+
+ @Override
+ public void close()
+ throws IOException
+ {
+ fileTasks.close();
+ // TODO: remove this after org.apache.iceberg.io.CloseableIterator'withClose
+ // correct release resources holds by iterator.
+ // (and make it final)
+ fileTasks = CloseableIterator.empty();
+ }
+ }
}
diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/TableStatisticsMaker.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/TableStatisticsMaker.java
index 5afacd9527a98..f068d7ecd0659 100644
--- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/TableStatisticsMaker.java
+++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/TableStatisticsMaker.java
@@ -29,8 +29,9 @@
import com.google.common.collect.ImmutableMap;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.theta.CompactSketch;
-import org.apache.iceberg.DataFile;
-import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.ContentScanTask;
+import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.GenericBlobMetadata;
import org.apache.iceberg.GenericStatisticsFile;
import org.apache.iceberg.HasTableOperations;
@@ -138,42 +139,12 @@ private TableStatistics makeTableStatistics(IcebergTableHandle tableHandle, Cons
.filter(column -> !identityPartitionIds.contains(column.fieldId()) && column.type().isPrimitiveType())
.collect(toImmutableList());
- TableScan tableScan = icebergTable.newScan()
- .filter(toIcebergExpression(intersection))
- .select(selectedColumns.stream().map(IcebergColumnHandle::getName).collect(Collectors.toList()))
- .useSnapshot(tableHandle.getIcebergTableName().getSnapshotId().get())
- .includeColumnStats();
-
- Partition summary = null;
- try (CloseableIterable fileScanTasks = tableScan.planFiles()) {
- for (FileScanTask fileScanTask : fileScanTasks) {
- DataFile dataFile = fileScanTask.file();
-
- if (summary == null) {
- summary = new Partition(
- idToTypeMapping,
- nonPartitionPrimitiveColumns,
- dataFile.partition(),
- dataFile.recordCount(),
- dataFile.fileSizeInBytes(),
- toMap(idToTypeMapping, dataFile.lowerBounds()),
- toMap(idToTypeMapping, dataFile.upperBounds()),
- dataFile.nullValueCounts(),
- dataFile.columnSizes());
- }
- else {
- summary.incrementFileCount();
- summary.incrementRecordCount(dataFile.recordCount());
- summary.incrementSize(dataFile.fileSizeInBytes());
- updateSummaryMin(summary, partitionFields, toMap(idToTypeMapping, dataFile.lowerBounds()), dataFile.nullValueCounts(), dataFile.recordCount());
- updateSummaryMax(summary, partitionFields, toMap(idToTypeMapping, dataFile.upperBounds()), dataFile.nullValueCounts(), dataFile.recordCount());
- summary.updateNullCount(dataFile.nullValueCounts());
- updateColumnSizes(summary, dataFile.columnSizes());
- }
- }
+ Partition summary;
+ if (tableHandle.getIcebergTableName().getTableType() == IcebergTableType.EQUALITY_DELETES) {
+ summary = getEqualityDeleteTableSummary(tableHandle, intersection, idToTypeMapping, nonPartitionPrimitiveColumns, partitionFields);
}
- catch (IOException e) {
- throw new UncheckedIOException(e);
+ else {
+ summary = getDataTableSummary(tableHandle, selectedColumns, intersection, idToTypeMapping, nonPartitionPrimitiveColumns, partitionFields);
}
if (summary == null) {
@@ -211,6 +182,75 @@ private TableStatistics makeTableStatistics(IcebergTableHandle tableHandle, Cons
return result.build();
}
+ private Partition getDataTableSummary(IcebergTableHandle tableHandle,
+ List selectedColumns,
+ TupleDomain intersection,
+ Map idToTypeMapping,
+ List nonPartitionPrimitiveColumns,
+ List partitionFields)
+ {
+ TableScan tableScan = icebergTable.newScan()
+ .filter(toIcebergExpression(intersection))
+ .select(selectedColumns.stream().map(IcebergColumnHandle::getName).collect(Collectors.toList()))
+ .useSnapshot(tableHandle.getIcebergTableName().getSnapshotId().get())
+ .includeColumnStats();
+
+ CloseableIterable> files = CloseableIterable.transform(tableScan.planFiles(), ContentScanTask::file);
+ return getSummaryFromFiles(files, idToTypeMapping, nonPartitionPrimitiveColumns, partitionFields);
+ }
+
+ private Partition getEqualityDeleteTableSummary(IcebergTableHandle tableHandle,
+ TupleDomain intersection,
+ Map idToTypeMapping,
+ List nonPartitionPrimitiveColumns,
+ List partitionFields)
+ {
+ CloseableIterable deleteFiles = IcebergUtil.getDeleteFiles(icebergTable,
+ tableHandle.getIcebergTableName().getSnapshotId().get(),
+ intersection,
+ tableHandle.getPartitionSpecId(),
+ tableHandle.getEqualityFieldIds());
+ CloseableIterable> files = CloseableIterable.transform(deleteFiles, deleteFile -> deleteFile);
+ return getSummaryFromFiles(files, idToTypeMapping, nonPartitionPrimitiveColumns, partitionFields);
+ }
+
+ private Partition getSummaryFromFiles(CloseableIterable> files,
+ Map idToTypeMapping,
+ List nonPartitionPrimitiveColumns,
+ List partitionFields)
+ {
+ Partition summary = null;
+ try (CloseableIterable> filesHolder = files) {
+ for (ContentFile> contentFile : filesHolder) {
+ if (summary == null) {
+ summary = new Partition(
+ idToTypeMapping,
+ nonPartitionPrimitiveColumns,
+ contentFile.partition(),
+ contentFile.recordCount(),
+ contentFile.fileSizeInBytes(),
+ toMap(idToTypeMapping, contentFile.lowerBounds()),
+ toMap(idToTypeMapping, contentFile.upperBounds()),
+ contentFile.nullValueCounts(),
+ contentFile.columnSizes());
+ }
+ else {
+ summary.incrementFileCount();
+ summary.incrementRecordCount(contentFile.recordCount());
+ summary.incrementSize(contentFile.fileSizeInBytes());
+ updateSummaryMin(summary, partitionFields, toMap(idToTypeMapping, contentFile.lowerBounds()), contentFile.nullValueCounts(), contentFile.recordCount());
+ updateSummaryMax(summary, partitionFields, toMap(idToTypeMapping, contentFile.upperBounds()), contentFile.nullValueCounts(), contentFile.recordCount());
+ summary.updateNullCount(contentFile.nullValueCounts());
+ updateColumnSizes(summary, contentFile.columnSizes());
+ }
+ }
+ }
+ catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ return summary;
+ }
+
public static void writeTableStatistics(NodeVersion nodeVersion, IcebergTableHandle tableHandle, Table icebergTable, ConnectorSession session, Collection computedStatistics)
{
new TableStatisticsMaker(icebergTable, session).writeTableStatistics(nodeVersion, tableHandle, computedStatistics);
diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/changelog/ChangelogSplitSource.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/changelog/ChangelogSplitSource.java
index 5e0052ec5e811..cc301573cbeef 100644
--- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/changelog/ChangelogSplitSource.java
+++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/changelog/ChangelogSplitSource.java
@@ -43,6 +43,7 @@
import static com.facebook.presto.hive.HiveCommonSessionProperties.getNodeSelectionStrategy;
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_CANNOT_OPEN_SPLIT;
import static com.facebook.presto.iceberg.IcebergUtil.getColumns;
+import static com.facebook.presto.iceberg.IcebergUtil.getDataSequenceNumber;
import static com.facebook.presto.iceberg.IcebergUtil.getPartitionKeys;
import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static com.google.common.collect.Iterators.limit;
@@ -128,6 +129,7 @@ private IcebergSplit splitFromContentScanTask(ContentScanTask task, Ch
Optional.of(new ChangelogSplitInfo(changeTask.operation(),
changeTask.changeOrdinal(),
changeTask.commitSnapshotId(),
- columnHandles)));
+ columnHandles)),
+ getDataSequenceNumber(task.file()));
}
}
diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/equalitydeletes/EqualityDeletesSplitSource.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/equalitydeletes/EqualityDeletesSplitSource.java
new file mode 100644
index 0000000000000..948779f33b2a0
--- /dev/null
+++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/equalitydeletes/EqualityDeletesSplitSource.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.iceberg.equalitydeletes;
+
+import com.facebook.presto.iceberg.IcebergSplit;
+import com.facebook.presto.iceberg.IcebergUtil;
+import com.facebook.presto.spi.ConnectorSession;
+import com.facebook.presto.spi.ConnectorSplit;
+import com.facebook.presto.spi.ConnectorSplitSource;
+import com.facebook.presto.spi.PrestoException;
+import com.facebook.presto.spi.SplitWeight;
+import com.facebook.presto.spi.connector.ConnectorPartitionHandle;
+import com.google.common.collect.ImmutableList;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import static com.facebook.presto.hive.HiveCommonSessionProperties.getNodeSelectionStrategy;
+import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR;
+import static com.facebook.presto.iceberg.IcebergUtil.getPartitionKeys;
+import static com.google.common.collect.Iterators.limit;
+import static java.util.Objects.requireNonNull;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
+public class EqualityDeletesSplitSource
+ implements ConnectorSplitSource
+{
+ private final ConnectorSession session;
+ private final Map specById;
+ private CloseableIterator deleteFiles;
+
+ public EqualityDeletesSplitSource(
+ ConnectorSession session,
+ Table table,
+ CloseableIterable deleteFiles)
+ {
+ this.session = requireNonNull(session, "session is null");
+ requireNonNull(table, "table is null");
+ requireNonNull(deleteFiles, "deleteFiles is null");
+ this.specById = table.specs();
+ this.deleteFiles = CloseableIterable.filter(deleteFiles, deleteFile -> deleteFile.content() == FileContent.EQUALITY_DELETES).iterator();
+ }
+
+ @Override
+ public boolean isFinished()
+ {
+ return !deleteFiles.hasNext();
+ }
+
+ @Override
+ public CompletableFuture getNextBatch(ConnectorPartitionHandle partitionHandle, int maxSize)
+ {
+ ImmutableList.Builder splits = new ImmutableList.Builder<>();
+ Iterator iterator = limit(deleteFiles, maxSize);
+ iterator.forEachRemaining(manifestReader -> {
+ splits.add(toIcebergSplit(manifestReader));
+ });
+ return completedFuture(new ConnectorSplitBatch(splits.build(), isFinished()));
+ }
+
+ @Override
+ public void close()
+ {
+ try {
+ deleteFiles.close();
+ // TODO: remove this after org.apache.iceberg.io.CloseableIterator'withClose
+ // correct release resources holds by iterator.
+ // (and make it final)
+ deleteFiles = CloseableIterator.empty();
+ }
+ catch (IOException e) {
+ throw new PrestoException(ICEBERG_FILESYSTEM_ERROR, e);
+ }
+ }
+
+ private ConnectorSplit toIcebergSplit(DeleteFile manifesReader)
+ {
+ return splitFromDeleteFile(manifesReader);
+ }
+
+ private IcebergSplit splitFromDeleteFile(DeleteFile deleteFile)
+ {
+ return new IcebergSplit(
+ deleteFile.path().toString(),
+ 0,
+ deleteFile.fileSizeInBytes(),
+ deleteFile.format(),
+ ImmutableList.of(),
+ getPartitionKeys(specById.get(deleteFile.specId()), deleteFile.partition()),
+ getNodeSelectionStrategy(session),
+ SplitWeight.standard(),
+ ImmutableList.of(),
+ Optional.empty(),
+ IcebergUtil.getDataSequenceNumber(deleteFile));
+ }
+}
diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergEqualityDeleteAsJoin.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergEqualityDeleteAsJoin.java
new file mode 100644
index 0000000000000..a411566ff8d9c
--- /dev/null
+++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergEqualityDeleteAsJoin.java
@@ -0,0 +1,465 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.iceberg.optimizer;
+
+import com.facebook.presto.common.function.OperatorType;
+import com.facebook.presto.common.predicate.TupleDomain;
+import com.facebook.presto.common.type.BigintType;
+import com.facebook.presto.common.type.BooleanType;
+import com.facebook.presto.common.type.TypeManager;
+import com.facebook.presto.iceberg.ColumnIdentity;
+import com.facebook.presto.iceberg.IcebergAbstractMetadata;
+import com.facebook.presto.iceberg.IcebergColumnHandle;
+import com.facebook.presto.iceberg.IcebergMetadataColumn;
+import com.facebook.presto.iceberg.IcebergTableHandle;
+import com.facebook.presto.iceberg.IcebergTableName;
+import com.facebook.presto.iceberg.IcebergTableType;
+import com.facebook.presto.iceberg.IcebergTransactionManager;
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ConnectorPlanOptimizer;
+import com.facebook.presto.spi.ConnectorPlanRewriter;
+import com.facebook.presto.spi.ConnectorSession;
+import com.facebook.presto.spi.PrestoException;
+import com.facebook.presto.spi.TableHandle;
+import com.facebook.presto.spi.VariableAllocator;
+import com.facebook.presto.spi.function.FunctionHandle;
+import com.facebook.presto.spi.function.StandardFunctionResolution;
+import com.facebook.presto.spi.plan.ConnectorJoinNode;
+import com.facebook.presto.spi.plan.EquiJoinClause;
+import com.facebook.presto.spi.plan.FilterNode;
+import com.facebook.presto.spi.plan.JoinType;
+import com.facebook.presto.spi.plan.PlanNode;
+import com.facebook.presto.spi.plan.PlanNodeIdAllocator;
+import com.facebook.presto.spi.plan.TableScanNode;
+import com.facebook.presto.spi.relation.CallExpression;
+import com.facebook.presto.spi.relation.RowExpression;
+import com.facebook.presto.spi.relation.SpecialFormExpression;
+import com.facebook.presto.spi.relation.VariableReferenceExpression;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.PARTITION_KEY;
+import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.REGULAR;
+import static com.facebook.presto.iceberg.IcebergColumnHandle.DATA_SEQUENCE_NUMBER_COLUMN_HANDLE;
+import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR;
+import static com.facebook.presto.iceberg.IcebergMetadataColumn.DATA_SEQUENCE_NUMBER;
+import static com.facebook.presto.iceberg.IcebergSessionProperties.isDeleteToJoinPushdownEnabled;
+import static com.facebook.presto.iceberg.IcebergUtil.getDeleteFiles;
+import static com.facebook.presto.iceberg.IcebergUtil.getIcebergTable;
+import static com.facebook.presto.iceberg.TypeConverter.toPrestoType;
+import static com.facebook.presto.spi.ConnectorPlanRewriter.rewriteWith;
+import static java.util.Objects.requireNonNull;
+
+/**
+ *
This optimizer implements equality deletes as a join, rather than having each split read the delete files and apply them.
+ * This approach significantly enhances performance for equality deletes, as most delete files will apply to most splits,
+ * and opening the delete file in each split incurs considerable overhead. Usually, the delete files are relatively small
+ * and can be broadcast easily. Each delete file may have a different schema, though typically there will be only a few delete
+ * schemas, often just one (the primary key).
+ *
+ *
For example, consider the following query:
+ * SELECT * FROM table;
+ * With 2 delete schemas: (pk), (orderid), the query will be transformed into:
+ *
+ * SELECT "$data_sequence_number", * FROM table
+ * LEFT JOIN "table$equality_deletes1" d1 ON left.pk = d1.pk AND left."$data_sequence_number" < d1."$data_sequence_number" -- Find deletes by schema 1
+ * LEFT JOIN "table$equality_deletes2" d2 ON left.orderid = d1.orderid AND left."$data_sequence_number" < d2."$data_sequence_number" -- Find deletes by schema 2
+ * WHERE COALESCE(d1."$data_sequence_number", d2."data_sequence_number") IS NULL -- None of the delete files had a delete for this row
+ *
+ * Note that table$equality_deletes1 and table$equality_deletes2 are different tables, each containing only the delete files with the schema for this join.
+ */
+
+public class IcebergEqualityDeleteAsJoin
+ implements ConnectorPlanOptimizer
+{
+ private final StandardFunctionResolution functionResolution;
+ private final IcebergTransactionManager transactionManager;
+ private final TypeManager typeManager;
+
+ IcebergEqualityDeleteAsJoin(StandardFunctionResolution functionResolution,
+ IcebergTransactionManager transactionManager,
+ TypeManager typeManager)
+ {
+ this.functionResolution = requireNonNull(functionResolution, "functionResolution is null");
+ this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
+ this.typeManager = requireNonNull(typeManager, "typeManager is null");
+ }
+
+ @Override
+ public PlanNode optimize(PlanNode maxSubplan, ConnectorSession session, VariableAllocator variableAllocator, PlanNodeIdAllocator idAllocator)
+ {
+ if (!isDeleteToJoinPushdownEnabled(session)) {
+ return maxSubplan;
+ }
+ return rewriteWith(new DeleteAsJoinRewriter(functionResolution,
+ transactionManager, idAllocator, session, typeManager, variableAllocator), maxSubplan);
+ }
+
+ private static class DeleteAsJoinRewriter
+ extends ConnectorPlanRewriter
+ {
+ private final ConnectorSession session;
+ private final StandardFunctionResolution functionResolution;
+ private final PlanNodeIdAllocator idAllocator;
+ private final IcebergTransactionManager transactionManager;
+ private final TypeManager typeManager;
+ private final VariableAllocator variableAllocator;
+
+ public DeleteAsJoinRewriter(
+ StandardFunctionResolution functionResolution,
+ IcebergTransactionManager transactionManager,
+ PlanNodeIdAllocator idAllocator,
+ ConnectorSession session,
+ TypeManager typeManager,
+ VariableAllocator variableAllocator)
+ {
+ this.functionResolution = requireNonNull(functionResolution, "functionResolution is null");
+ this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
+ this.idAllocator = requireNonNull(idAllocator, "idAllocator is null");
+ this.session = requireNonNull(session, "session is null");
+ this.typeManager = requireNonNull(typeManager, "typeManager is null");
+ this.variableAllocator = requireNonNull(variableAllocator, "variableAllocator is null");
+ }
+
+ @Override
+ public PlanNode visitTableScan(TableScanNode node, RewriteContext context)
+ {
+ TableHandle table = node.getTable();
+ IcebergTableHandle icebergTableHandle = (IcebergTableHandle) table.getConnectorHandle();
+ IcebergTableName tableName = icebergTableHandle.getIcebergTableName();
+ if (!tableName.getSnapshotId().isPresent() || tableName.getTableType() != IcebergTableType.DATA) {
+ // Node is already optimized or not ready for planning
+ return node;
+ }
+
+ IcebergAbstractMetadata metadata = (IcebergAbstractMetadata) transactionManager.get(table.getTransaction());
+ Table icebergTable = getIcebergTable(metadata, session, icebergTableHandle.getSchemaTableName());
+
+ // Collect info about each unique delete schema to join by
+ ImmutableMap, DeleteSetInfo> deleteSchemas = collectDeleteInformation(icebergTable, icebergTableHandle, tableName.getSnapshotId().get());
+
+ if (deleteSchemas.isEmpty()) {
+ // no equality deletes
+ return node;
+ }
+
+ // Add all the fields required by the join that were not added by the user's query
+ ImmutableMap unselectedAssignments = createAssignmentsForUnselectedFields(node, deleteSchemas, icebergTable);
+ TableScanNode updatedTableScan = createNewRoot(node, icebergTableHandle, tableName, unselectedAssignments, table);
+
+ Map reverseAssignmentsMap = updatedTableScan
+ .getAssignments()
+ .entrySet()
+ .stream()
+ .collect(Collectors.toMap(assignment -> ((IcebergColumnHandle) (assignment.getValue())).getId(), Map.Entry::getKey));
+
+ List deleteVersionColumns = new ArrayList<>();
+ PlanNode parentNode = updatedTableScan;
+ // For each unique delete schema add a join that applies those equality deletes
+ for (Map.Entry, DeleteSetInfo> entry : deleteSchemas.entrySet()) {
+ DeleteSetInfo deleteGroupInfo = entry.getValue();
+
+ List deleteFields = deleteGroupInfo
+ .equalityFieldIds
+ .stream()
+ .map(fieldId -> icebergTable.schema().findField(fieldId))
+ .filter(Objects::nonNull)
+ .collect(Collectors.toList());
+
+ VariableReferenceExpression joinSequenceNumber = toVariableReference(DATA_SEQUENCE_NUMBER_COLUMN_HANDLE);
+ deleteVersionColumns.add(joinSequenceNumber);
+ ImmutableMap deleteColumnAssignments = ImmutableMap.builder()
+ .putAll(deleteGroupInfo.allFields(icebergTable.schema()).stream().collect(Collectors.toMap(this::toVariableReference, this::toIcebergColumnHandle)))
+ .put(joinSequenceNumber, DATA_SEQUENCE_NUMBER_COLUMN_HANDLE)
+ .build();
+
+ // ON source.delete_column = deletes.delete_column, ...
+ Set clauses = deleteColumnAssignments
+ .entrySet()
+ .stream()
+ .filter(assignment -> !IcebergMetadataColumn.isMetadataColumnId(((IcebergColumnHandle) (assignment.getValue())).getId()))
+ .map(assignment -> {
+ VariableReferenceExpression left = reverseAssignmentsMap.get(((IcebergColumnHandle) (assignment.getValue())).getId());
+ VariableReferenceExpression right = assignment.getKey();
+ return new EquiJoinClause(left, right);
+ }).collect(Collectors.toSet());
+
+ FunctionHandle lessThan = functionResolution.comparisonFunction(OperatorType.LESS_THAN, BigintType.BIGINT, BigintType.BIGINT);
+
+ // AND source.$data_sequence_number < deletes.$data_sequence_number
+ RowExpression versionFilter = new CallExpression(lessThan.getName(),
+ lessThan,
+ BooleanType.BOOLEAN,
+ Collections.unmodifiableList(Arrays.asList(reverseAssignmentsMap.get(DATA_SEQUENCE_NUMBER.getId()), joinSequenceNumber)));
+
+ TableScanNode deleteTableScan = createDeletesTableScan(deleteColumnAssignments,
+ icebergTableHandle,
+ tableName,
+ deleteFields,
+ table,
+ deleteGroupInfo);
+
+ parentNode = new ConnectorJoinNode(idAllocator.getNextId(),
+ Arrays.asList(parentNode, deleteTableScan),
+ Optional.empty(),
+ JoinType.LEFT,
+ clauses,
+ Sets.newHashSet(versionFilter),
+ Optional.empty(), // Allow stats to determine join distribution
+ Stream.concat(parentNode.getOutputVariables().stream(), deleteTableScan.getOutputVariables().stream()).collect(Collectors.toList()));
+ }
+
+ FilterNode filter = new FilterNode(Optional.empty(), idAllocator.getNextId(), Optional.empty(), parentNode,
+ new SpecialFormExpression(SpecialFormExpression.Form.IS_NULL, BooleanType.BOOLEAN,
+ new SpecialFormExpression(SpecialFormExpression.Form.COALESCE, BigintType.BIGINT, deleteVersionColumns)));
+
+ return filter;
+ }
+
+ private static ImmutableMap, DeleteSetInfo> collectDeleteInformation(Table icebergTable,
+ IcebergTableHandle icebergTableHandle,
+ long snapshotId)
+ {
+ // Delete schemas can repeat, so using a normal hashmap to dedup, will be converted to immutable at the end of the function.
+ HashMap, DeleteSetInfo> deleteInformations = new HashMap<>();
+ try (CloseableIterator files =
+ getDeleteFiles(icebergTable, snapshotId, icebergTableHandle.getPredicate(), Optional.empty(), Optional.empty()).iterator()) {
+ files.forEachRemaining(delete -> {
+ if (delete.content() == FileContent.EQUALITY_DELETES) {
+ ImmutableMap.Builder partitionFieldsBuilder = new ImmutableMap.Builder<>();
+ PartitionSpec partitionSpec = icebergTable.specs().get(delete.specId());
+ Types.StructType partitionType = partitionSpec.partitionType();
+ // PartitionField ids are unique across the entire table in v2. We can assume we are in v2 since v1 doesn't have equality deletes
+ partitionSpec.fields().forEach(f -> partitionFieldsBuilder.put(f.fieldId(), new PartitionFieldInfo(partitionType.field(f.fieldId()), f)));
+ ImmutableMap partitionFields = partitionFieldsBuilder.build();
+ HashSet result = new HashSet<>();
+ result.addAll(delete.equalityFieldIds());
+ result.addAll(partitionFields.keySet());
+ deleteInformations.put(ImmutableSet.copyOf(result), new DeleteSetInfo(partitionFields, delete.equalityFieldIds()));
+ }
+ });
+ }
+ catch (IOException e) {
+ throw new PrestoException(ICEBERG_FILESYSTEM_ERROR, "Failed to read equality delete information", e);
+ }
+ return ImmutableMap.copyOf(deleteInformations);
+ }
+
+ private TableScanNode createDeletesTableScan(ImmutableMap deleteColumnAssignments,
+ IcebergTableHandle icebergTableHandle,
+ IcebergTableName tableName,
+ List deleteFields,
+ TableHandle table,
+ DeleteSetInfo deleteInfo)
+ {
+ List outputs = deleteColumnAssignments.keySet().asList();
+ IcebergTableHandle deletesTableHandle = new IcebergTableHandle(icebergTableHandle.getSchemaName(),
+ new IcebergTableName(tableName.getTableName(),
+ IcebergTableType.EQUALITY_DELETES, // Read equality deletes instead of data
+ tableName.getSnapshotId(),
+ Optional.empty()),
+ icebergTableHandle.isSnapshotSpecified(),
+ icebergTableHandle.getPredicate(),
+ Optional.of(SchemaParser.toJson(new Schema(deleteFields))),
+ Optional.of(deleteInfo.partitionFields.keySet()), // Enforce reading only delete files that match this schema
+ Optional.of(deleteInfo.equalityFieldIds));
+
+ return new TableScanNode(Optional.empty(),
+ idAllocator.getNextId(),
+ new TableHandle(table.getConnectorId(), deletesTableHandle, table.getTransaction(), table.getLayout(), table.getDynamicFilter()),
+ outputs,
+ deleteColumnAssignments,
+ TupleDomain.all(),
+ TupleDomain.all());
+ }
+
+ /**
+ * - Updates table handle to DATA_WITHOUT_EQUALITY_DELETES since the page source for this node should now not apply equality deletes.
+ * - Adds extra assignments and outputs that are needed by the join
+ */
+ private TableScanNode createNewRoot(TableScanNode node, IcebergTableHandle icebergTableHandle, IcebergTableName tableName, ImmutableMap unselectedAssignments, TableHandle table)
+ {
+ IcebergTableHandle updatedHandle = new IcebergTableHandle(icebergTableHandle.getSchemaName(),
+ new IcebergTableName(tableName.getTableName(),
+ IcebergTableType.DATA_WITHOUT_EQUALITY_DELETES, // Don't apply equality deletes in the split
+ tableName.getSnapshotId(),
+ tableName.getChangelogEndSnapshot()),
+ icebergTableHandle.isSnapshotSpecified(),
+ icebergTableHandle.getPredicate(),
+ icebergTableHandle.getTableSchemaJson(),
+ icebergTableHandle.getPartitionSpecId(),
+ icebergTableHandle.getEqualityFieldIds());
+
+ VariableReferenceExpression dataSequenceNumberVariableReference = toVariableReference(DATA_SEQUENCE_NUMBER_COLUMN_HANDLE);
+ ImmutableMap.Builder assignmentsBuilder = ImmutableMap.builder()
+ .put(dataSequenceNumberVariableReference, DATA_SEQUENCE_NUMBER_COLUMN_HANDLE)
+ .putAll(unselectedAssignments)
+ .putAll(node.getAssignments());
+ ImmutableList.Builder outputsBuilder = ImmutableList.builder();
+ outputsBuilder.addAll(node.getOutputVariables());
+ if (!node.getAssignments().containsKey(dataSequenceNumberVariableReference)) {
+ outputsBuilder.add(dataSequenceNumberVariableReference);
+ }
+ outputsBuilder.addAll(unselectedAssignments.keySet());
+
+ return new TableScanNode(node.getSourceLocation(),
+ node.getId(),
+ Optional.of(node),
+ new TableHandle(table.getConnectorId(), updatedHandle, table.getTransaction(), table.getLayout(), table.getDynamicFilter()),
+ outputsBuilder.build(),
+ assignmentsBuilder.build(),
+ node.getTableConstraints(),
+ node.getCurrentConstraint(),
+ node.getEnforcedConstraint());
+ }
+
+ /**
+ * Calculate required fields that the user didn't include in his select statement and add assignments for them to add to the table scan
+ */
+ private ImmutableMap createAssignmentsForUnselectedFields(TableScanNode node,
+ ImmutableMap, DeleteSetInfo> deleteInfos,
+ Table icebergTable)
+ {
+ Set selectedFields = node.getAssignments().values().stream().map(f -> ((IcebergColumnHandle) f).getId()).collect(Collectors.toSet());
+ Set unselectedFields = Sets.difference(deleteInfos.keySet().stream().reduce(Sets::union).orElseGet(Collections::emptySet), selectedFields);
+ ImmutableMap.Builder unselectedAssignmentsBuilder = ImmutableMap.builder();
+ Map partitionFields = deleteInfos.values().stream()
+ .flatMap(info -> info.getPartitionFields().entrySet().stream())
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (existing, replacement) -> existing));
+ unselectedFields
+ .forEach(fieldId -> {
+ if (partitionFields.containsKey(fieldId)) {
+ PartitionFieldInfo partitionFieldInfo = partitionFields.get(fieldId);
+ PartitionField partitionField = partitionFieldInfo.getPartitionField();
+ Types.NestedField sourceField = icebergTable.schema().findField(partitionField.sourceId());
+ if (!partitionField.transform().isIdentity()) {
+ Type partitionFieldType = partitionField.transform().getResultType(sourceField.type());
+ VariableReferenceExpression variableReference = variableAllocator.newVariable(partitionField.name(), toPrestoType(partitionFieldType, typeManager));
+ IcebergColumnHandle columnHandle = new IcebergColumnHandle(
+ ColumnIdentity.createColumnIdentity(partitionField.name(), partitionField.fieldId(), partitionFieldType),
+ toPrestoType(partitionFieldType, typeManager),
+ Optional.empty(),
+ PARTITION_KEY);
+ unselectedAssignmentsBuilder.put(variableReference, columnHandle);
+ }
+ else if (!selectedFields.contains(sourceField.fieldId())) {
+ unselectedAssignmentsBuilder.put(
+ variableAllocator.newVariable(sourceField.name(), toPrestoType(sourceField.type(), typeManager)),
+ IcebergColumnHandle.create(sourceField, typeManager, REGULAR));
+ }
+ }
+ else {
+ Types.NestedField schemaField = icebergTable.schema().findField(fieldId);
+ unselectedAssignmentsBuilder.put(
+ variableAllocator.newVariable(schemaField.name(), toPrestoType(schemaField.type(), typeManager)),
+ IcebergColumnHandle.create(schemaField, typeManager, REGULAR));
+ }
+ });
+ return unselectedAssignmentsBuilder.build();
+ }
+
+ private VariableReferenceExpression toVariableReference(IcebergColumnHandle columnHandle)
+ {
+ return variableAllocator.newVariable(columnHandle.getName(), columnHandle.getType());
+ }
+
+ private IcebergColumnHandle toIcebergColumnHandle(Types.NestedField field)
+ {
+ ColumnIdentity columnIdentity = new ColumnIdentity(field.fieldId(), field.name(), ColumnIdentity.TypeCategory.PRIMITIVE, Collections.emptyList());
+ return new IcebergColumnHandle(columnIdentity, toPrestoType(field.type(), typeManager), Optional.empty(), REGULAR);
+ }
+
+ private VariableReferenceExpression toVariableReference(Types.NestedField field)
+ {
+ return variableAllocator.newVariable(field.name(), toPrestoType(field.type(), typeManager));
+ }
+
+ private static class PartitionFieldInfo
+ {
+ private final Types.NestedField nestedField;
+ private final PartitionField partitionField;
+
+ private PartitionFieldInfo(Types.NestedField nestedField, PartitionField partitionField)
+ {
+ this.nestedField = nestedField;
+ this.partitionField = partitionField;
+ }
+
+ public PartitionField getPartitionField()
+ {
+ return partitionField;
+ }
+ }
+
+ private static class DeleteSetInfo
+ {
+ private final ImmutableMap partitionFields;
+ private final Set equalityFieldIds;
+
+ private DeleteSetInfo(ImmutableMap partitionFields,
+ List equalityFieldIds)
+ {
+ this.partitionFields = requireNonNull(partitionFields, "partitionFields is null");
+ this.equalityFieldIds = ImmutableSet.copyOf(requireNonNull(equalityFieldIds, "equalityFieldIds is null"));
+ }
+
+ public ImmutableMap getPartitionFields()
+ {
+ return partitionFields;
+ }
+
+ public List allFields(Schema schema)
+ {
+ return Stream.concat(equalityFieldIds
+ .stream()
+ .map(schema::findField),
+ partitionFields
+ .values()
+ .stream()
+ .map(partitionFieldInfo -> {
+ if (partitionFieldInfo.partitionField.transform().isIdentity()) {
+ return schema.findField(partitionFieldInfo.partitionField.sourceId());
+ }
+ return partitionFieldInfo.nestedField;
+ })).collect(Collectors.toList());
+ }
+ }
+ }
+}
diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergPlanOptimizer.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergPlanOptimizer.java
index 79832fd866336..2382d9337de67 100644
--- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergPlanOptimizer.java
+++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergPlanOptimizer.java
@@ -134,7 +134,9 @@ public PlanNode visitFilter(FilterNode filter, RewriteContext context)
oldTableHandle.getIcebergTableName(),
oldTableHandle.isSnapshotSpecified(),
simplifiedColumnDomain,
- oldTableHandle.getTableSchemaJson());
+ oldTableHandle.getTableSchemaJson(),
+ oldTableHandle.getPartitionSpecId(),
+ oldTableHandle.getEqualityFieldIds());
TableScanNode newTableScan = new TableScanNode(
tableScan.getSourceLocation(),
tableScan.getId(),
diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergPlanOptimizerProvider.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergPlanOptimizerProvider.java
index 5485bc9c7d76c..c2efe7ad00c0b 100644
--- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergPlanOptimizerProvider.java
+++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergPlanOptimizerProvider.java
@@ -31,6 +31,7 @@ public class IcebergPlanOptimizerProvider
implements ConnectorPlanOptimizerProvider
{
private final Set planOptimizers;
+ private final Set logicalPlanOptimizers;
@Inject
public IcebergPlanOptimizerProvider(
@@ -49,12 +50,16 @@ public IcebergPlanOptimizerProvider(
new IcebergPlanOptimizer(functionResolution, rowExpressionService, transactionManager),
new IcebergFilterPushdown(rowExpressionService, functionResolution, functionMetadataManager, transactionManager, typeManager),
new IcebergParquetDereferencePushDown(transactionManager, rowExpressionService, typeManager));
+ this.logicalPlanOptimizers = ImmutableSet.builder()
+ .addAll(this.planOptimizers)
+ .add(new IcebergEqualityDeleteAsJoin(functionResolution, transactionManager, typeManager))
+ .build();
}
@Override
public Set getLogicalPlanOptimizers()
{
- return planOptimizers;
+ return logicalPlanOptimizers;
}
@Override
diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java
index 7b9f8870c5510..09f1a26845321 100644
--- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java
+++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java
@@ -15,6 +15,8 @@
import com.facebook.presto.Session;
import com.facebook.presto.common.QualifiedObjectName;
+import com.facebook.presto.common.block.Block;
+import com.facebook.presto.common.block.BlockBuilder;
import com.facebook.presto.common.transaction.TransactionId;
import com.facebook.presto.hive.HdfsConfiguration;
import com.facebook.presto.hive.HdfsConfigurationInitializer;
@@ -62,6 +64,7 @@
import org.apache.iceberg.deletes.PositionDeleteWriter;
import org.apache.iceberg.hadoop.HadoopOutputFile;
import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.TableScanUtil;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -74,6 +77,8 @@
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -81,11 +86,16 @@
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import static com.facebook.presto.common.type.BigintType.BIGINT;
+import static com.facebook.presto.common.type.IntegerType.INTEGER;
import static com.facebook.presto.common.type.TimeZoneKey.UTC_KEY;
import static com.facebook.presto.common.type.VarcharType.VARCHAR;
+import static com.facebook.presto.iceberg.IcebergQueryRunner.ICEBERG_CATALOG;
import static com.facebook.presto.iceberg.IcebergQueryRunner.TEST_CATALOG_DIRECTORY;
import static com.facebook.presto.iceberg.IcebergQueryRunner.TEST_DATA_DIRECTORY;
+import static com.facebook.presto.iceberg.IcebergSessionProperties.DELETE_AS_JOIN_REWRITE_ENABLED;
import static com.facebook.presto.iceberg.IcebergSessionProperties.STATISTIC_SNAPSHOT_RECORD_DIFFERENCE_WEIGHT;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.facebook.presto.testing.MaterializedResult.resultBuilder;
@@ -859,24 +869,43 @@ public void testTableWithPositionDelete(String fileFormat)
assertQuery("SELECT nationkey FROM " + tableName, "SELECT nationkey FROM nation WHERE nationkey not in (0 ,8)");
}
- @Test(dataProvider = "fileFormat")
- public void testTableWithEqualityDelete(String fileFormat)
+ @DataProvider(name = "equalityDeleteOptions")
+ public Object[][] equalityDeleteDataProvider()
+ {
+ return new Object[][] {
+ {"PARQUET", false},
+ {"PARQUET", true},
+ {"ORC", false},
+ {"ORC", true}};
+ }
+
+ private Session deleteAsJoinEnabled(boolean joinRewriteEnabled)
+ {
+ return Session.builder(getQueryRunner().getDefaultSession())
+ .setCatalogSessionProperty(ICEBERG_CATALOG, DELETE_AS_JOIN_REWRITE_ENABLED, String.valueOf(joinRewriteEnabled))
+ .build();
+ }
+
+ @Test(dataProvider = "equalityDeleteOptions")
+ public void testTableWithEqualityDelete(String fileFormat, boolean joinRewriteEnabled)
throws Exception
{
+ Session session = deleteAsJoinEnabled(joinRewriteEnabled);
String tableName = "test_v2_equality_delete" + randomTableSuffix();
- assertUpdate("CREATE TABLE " + tableName + " with (format = '" + fileFormat + "') AS SELECT * FROM tpch.tiny.nation", 25);
+ assertUpdate(session, "CREATE TABLE " + tableName + " with (format = '" + fileFormat + "') AS SELECT * FROM tpch.tiny.nation", 25);
Table icebergTable = updateTable(tableName);
writeEqualityDeleteToNationTable(icebergTable, ImmutableMap.of("regionkey", 1L));
testCheckDeleteFiles(icebergTable, 1, ImmutableList.of(EQUALITY_DELETES));
- assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation WHERE regionkey != 1");
- assertQuery("SELECT nationkey FROM " + tableName, "SELECT nationkey FROM nation WHERE regionkey != 1");
+ assertQuery(session, "SELECT * FROM " + tableName, "SELECT * FROM nation WHERE regionkey != 1");
+ assertQuery(session, "SELECT nationkey FROM " + tableName, "SELECT nationkey FROM nation WHERE regionkey != 1");
}
- @Test(dataProvider = "fileFormat")
- public void testTableWithPositionDeleteAndEqualityDelete(String fileFormat)
+ @Test(dataProvider = "equalityDeleteOptions")
+ public void testTableWithPositionDeleteAndEqualityDelete(String fileFormat, boolean joinRewriteEnabled)
throws Exception
{
+ Session session = deleteAsJoinEnabled(joinRewriteEnabled);
String tableName = "test_v2_row_delete_" + randomTableSuffix();
assertUpdate("CREATE TABLE " + tableName + " with (format = '" + fileFormat + "') AS SELECT * FROM tpch.tiny.nation order by nationkey", 25);
Table icebergTable = updateTable(tableName);
@@ -884,43 +913,157 @@ public void testTableWithPositionDeleteAndEqualityDelete(String fileFormat)
writePositionDeleteToNationTable(icebergTable, dataFilePath, 0);
testCheckDeleteFiles(icebergTable, 1, ImmutableList.of(POSITION_DELETES));
- assertQuery("SELECT count(*) FROM " + tableName, "VALUES 24");
- assertQuery("SELECT nationkey FROM " + tableName, "SELECT nationkey FROM nation WHERE nationkey != 0");
+ assertQuery(session, "SELECT count(*) FROM " + tableName, "VALUES 24");
+ assertQuery(session, "SELECT nationkey FROM " + tableName, "SELECT nationkey FROM nation WHERE nationkey != 0");
writeEqualityDeleteToNationTable(icebergTable, ImmutableMap.of("regionkey", 1L));
testCheckDeleteFiles(icebergTable, 2, ImmutableList.of(POSITION_DELETES, EQUALITY_DELETES));
- assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation WHERE regionkey != 1 AND nationkey != 0");
- assertQuery("SELECT nationkey FROM " + tableName, "SELECT nationkey FROM nation WHERE regionkey != 1 AND nationkey != 0");
+ assertQuery(session, "SELECT * FROM " + tableName, "SELECT * FROM nation WHERE regionkey != 1 AND nationkey != 0");
+ assertQuery(session, "SELECT nationkey FROM " + tableName, "SELECT nationkey FROM nation WHERE regionkey != 1 AND nationkey != 0");
}
- @Test(dataProvider = "fileFormat")
- public void testTableWithPositionDeletesAndEqualityDeletes(String fileFormat)
+ @Test(dataProvider = "equalityDeleteOptions")
+ public void testEqualityDeletesWithPartitions(String fileFormat, boolean joinRewriteEnabled)
throws Exception
{
+ Session session = deleteAsJoinEnabled(joinRewriteEnabled);
+ String tableName = "test_v2_row_delete_" + randomTableSuffix();
+ assertUpdate("CREATE TABLE " + tableName + " with (format = '" + fileFormat + "', partitioning = ARRAY['nationkey']) AS SELECT * FROM tpch.tiny.nation order by nationkey", 25);
+ Table icebergTable = updateTable(tableName);
+
+ List partitions = Arrays.asList(1L, 2L, 3L, 17L, 24L);
+ for (long nationKey : partitions) {
+ writeEqualityDeleteToNationTable(icebergTable, ImmutableMap.of("regionkey", 1L), ImmutableMap.of("nationkey", nationKey));
+ }
+ testCheckDeleteFiles(icebergTable, partitions.size(), partitions.stream().map(i -> EQUALITY_DELETES).collect(Collectors.toList()));
+ assertQuery(session, "SELECT * FROM " + tableName, "SELECT * FROM nation WHERE regionkey != 1");
+ assertQuery(session, "SELECT nationkey FROM " + tableName, "SELECT nationkey FROM nation WHERE regionkey != 1");
+ assertQuery(session, "SELECT name FROM " + tableName, "SELECT name FROM nation WHERE regionkey != 1");
+ }
+
+ @Test(dataProvider = "equalityDeleteOptions")
+ public void testEqualityDeletesWithHiddenPartitions(String fileFormat, boolean joinRewriteEnabled)
+ throws Exception
+ {
+ Session session = deleteAsJoinEnabled(joinRewriteEnabled);
+ String tableName = "test_v2_row_delete_" + randomTableSuffix();
+ assertUpdate("CREATE TABLE " + tableName + " with (format = '" + fileFormat + "', partitioning = ARRAY['bucket(nationkey,100)']) AS SELECT * FROM tpch.tiny.nation order by nationkey", 25);
+ Table icebergTable = updateTable(tableName);
+
+ PartitionTransforms.ColumnTransform columnTransform = PartitionTransforms.getColumnTransform(icebergTable.spec().fields().get(0), BIGINT);
+ BlockBuilder builder = BIGINT.createFixedSizeBlockBuilder(5);
+ List partitions = Arrays.asList(1L, 2L, 3L, 17L, 24L);
+ partitions.forEach(p -> BIGINT.writeLong(builder, p));
+ Block partitionsBlock = columnTransform.getTransform().apply(builder.build());
+ for (int i = 0; i < partitionsBlock.getPositionCount(); i++) {
+ writeEqualityDeleteToNationTable(icebergTable, ImmutableMap.of("regionkey", 1L), ImmutableMap.of("nationkey_bucket", partitionsBlock.getInt(i)));
+ String updatedPartitions = partitions.stream().limit(i + 1).map(Object::toString).collect(Collectors.joining(",", "(", ")"));
+ assertQuery(session, "SELECT * FROM " + tableName, "SELECT * FROM nation WHERE NOT(regionkey = 1 AND nationkey IN " + updatedPartitions + ")");
+ }
+ testCheckDeleteFiles(icebergTable, partitions.size(), partitions.stream().map(i -> EQUALITY_DELETES).collect(Collectors.toList()));
+ assertQuery(session, "SELECT * FROM " + tableName, "SELECT * FROM nation WHERE regionkey != 1");
+ assertQuery(session, "SELECT nationkey FROM " + tableName, "SELECT nationkey FROM nation WHERE regionkey != 1");
+ assertQuery(session, "SELECT name FROM " + tableName, "SELECT name FROM nation WHERE regionkey != 1");
+ }
+
+ @Test(dataProvider = "equalityDeleteOptions")
+ public void testEqualityDeletesWithCompositeKey(String fileFormat, boolean joinRewriteEnabled)
+ throws Exception
+ {
+ Session session = deleteAsJoinEnabled(joinRewriteEnabled);
String tableName = "test_v2_row_delete_" + randomTableSuffix();
assertUpdate("CREATE TABLE " + tableName + " with (format = '" + fileFormat + "') AS SELECT * FROM tpch.tiny.nation order by nationkey", 25);
Table icebergTable = updateTable(tableName);
+
+ writeEqualityDeleteToNationTable(icebergTable, ImmutableMap.of("regionkey", 0L, "name", "ALGERIA"));
+ testCheckDeleteFiles(icebergTable, 1, Stream.generate(() -> EQUALITY_DELETES).limit(1).collect(Collectors.toList()));
+ assertQuery(session, "SELECT * FROM " + tableName, "SELECT * FROM nation WHERE NOT(regionkey = 0 AND name = 'ALGERIA')");
+ assertQuery(session, "SELECT nationkey FROM " + tableName, "SELECT nationkey FROM nation WHERE NOT(regionkey = 0 AND name = 'ALGERIA')");
+ assertQuery(session, "SELECT name FROM " + tableName, "SELECT name FROM nation WHERE NOT(regionkey = 0 AND name = 'ALGERIA')");
+ }
+
+ @Test(dataProvider = "equalityDeleteOptions")
+ public void testEqualityDeletesWithMultipleDeleteSchemas(String fileFormat, boolean joinRewriteEnabled)
+ throws Exception
+ {
+ Session session = deleteAsJoinEnabled(joinRewriteEnabled);
+ String tableName = "test_v2_row_delete_" + randomTableSuffix();
+ assertUpdate("CREATE TABLE " + tableName + " with (format = '" + fileFormat + "') AS SELECT * FROM tpch.tiny.nation order by nationkey", 25);
+ Table icebergTable = updateTable(tableName);
+
+ writeEqualityDeleteToNationTable(icebergTable, ImmutableMap.of("regionkey", 1L));
+ writeEqualityDeleteToNationTable(icebergTable, ImmutableMap.of("nationkey", 10L));
+ writeEqualityDeleteToNationTable(icebergTable, ImmutableMap.of("regionkey", 2L, "nationkey", 9L));
+ testCheckDeleteFiles(icebergTable, 3, Stream.generate(() -> EQUALITY_DELETES).limit(3).collect(Collectors.toList()));
+ assertQuery(session, "SELECT \"$data_sequence_number\", regionkey, nationkey FROM \"" + tableName + "$equality_deletes\"", "VALUES (2, 1, null), (3, null, 10), (4, 2, 9)");
+ assertQuery(session, "SELECT * FROM " + tableName, "SELECT * FROM nation WHERE NOT(regionkey = 2 AND nationkey = 9) AND nationkey <> 10 AND regionkey <> 1");
+ assertQuery(session, "SELECT regionkey FROM " + tableName, "SELECT regionkey FROM nation WHERE NOT(regionkey = 2 AND nationkey = 9) AND nationkey <> 10 AND regionkey <> 1");
+ assertQuery(session, "SELECT nationkey FROM " + tableName, "SELECT nationkey FROM nation WHERE NOT(regionkey = 2 AND nationkey = 9) AND nationkey <> 10 AND regionkey <> 1");
+ }
+
+ @Test(dataProvider = "equalityDeleteOptions")
+ public void testTableWithPositionDeletesAndEqualityDeletes(String fileFormat, boolean joinRewriteEnabled)
+ throws Exception
+ {
+ Session session = deleteAsJoinEnabled(joinRewriteEnabled);
+ String tableName = "test_v2_row_delete_" + randomTableSuffix();
+ assertUpdate(session, "CREATE TABLE " + tableName + " with (format = '" + fileFormat + "') AS SELECT * FROM tpch.tiny.nation order by nationkey", 25);
+ Table icebergTable = updateTable(tableName);
String dataFilePath = (String) computeActual("SELECT file_path FROM \"" + tableName + "$files\" LIMIT 1").getOnlyValue();
writePositionDeleteToNationTable(icebergTable, dataFilePath, 0);
testCheckDeleteFiles(icebergTable, 1, ImmutableList.of(POSITION_DELETES));
- assertQuery("SELECT count(*) FROM " + tableName, "VALUES 24");
- assertQuery("SELECT nationkey FROM " + tableName, "SELECT nationkey FROM nation WHERE nationkey != 0");
+ assertQuery(session, "SELECT count(*) FROM " + tableName, "VALUES 24");
+ assertQuery(session, "SELECT nationkey FROM " + tableName, "SELECT nationkey FROM nation WHERE nationkey != 0");
writeEqualityDeleteToNationTable(icebergTable, ImmutableMap.of("regionkey", 1L));
testCheckDeleteFiles(icebergTable, 2, ImmutableList.of(POSITION_DELETES, EQUALITY_DELETES));
- assertQuery("SELECT count(*) FROM " + tableName, "VALUES 19");
- assertQuery("SELECT nationkey FROM " + tableName, "SELECT nationkey FROM nation WHERE regionkey != 1 AND nationkey != 0");
+ assertQuery(session, "SELECT count(*) FROM " + tableName, "VALUES 19");
+ assertQuery(session, "SELECT nationkey FROM " + tableName, "SELECT nationkey FROM nation WHERE regionkey != 1 AND nationkey != 0");
writePositionDeleteToNationTable(icebergTable, dataFilePath, 7);
testCheckDeleteFiles(icebergTable, 3, ImmutableList.of(POSITION_DELETES, POSITION_DELETES, EQUALITY_DELETES));
- assertQuery("SELECT count(*) FROM " + tableName, "VALUES 18");
- assertQuery("SELECT nationkey FROM " + tableName, "SELECT nationkey FROM nation WHERE regionkey != 1 AND nationkey NOT IN (0, 7)");
+ assertQuery(session, "SELECT count(*) FROM " + tableName, "VALUES 18");
+ assertQuery(session, "SELECT nationkey FROM " + tableName, "SELECT nationkey FROM nation WHERE regionkey != 1 AND nationkey NOT IN (0, 7)");
writeEqualityDeleteToNationTable(icebergTable, ImmutableMap.of("regionkey", 2L));
testCheckDeleteFiles(icebergTable, 4, ImmutableList.of(POSITION_DELETES, POSITION_DELETES, EQUALITY_DELETES, EQUALITY_DELETES));
- assertQuery("SELECT count(*) FROM " + tableName, "VALUES 13");
- assertQuery("SELECT nationkey FROM " + tableName, "SELECT nationkey FROM nation WHERE regionkey NOT IN (1, 2) AND nationkey NOT IN (0, 7)");
+ assertQuery(session, "SELECT count(*) FROM " + tableName, "VALUES 13");
+ assertQuery(session, "SELECT nationkey FROM " + tableName, "SELECT nationkey FROM nation WHERE regionkey NOT IN (1, 2) AND nationkey NOT IN (0, 7)");
+ }
+
+ @Test(dataProvider = "equalityDeleteOptions")
+ public void testEqualityDeletesWithHiddenPartitionsEvolution(String fileFormat, boolean joinRewriteEnabled)
+ throws Exception
+ {
+ Session session = deleteAsJoinEnabled(joinRewriteEnabled);
+ String tableName = "test_v2_row_delete_" + randomTableSuffix();
+ assertUpdate("CREATE TABLE " + tableName + "(a int, b varchar) WITH (format = '" + fileFormat + "')");
+ assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001'), (2, '1002'), (3, '1003')", 3);
+
+ Table icebergTable = updateTable(tableName);
+ writeEqualityDeleteToNationTable(icebergTable, ImmutableMap.of("a", 2));
+ assertQuery(session, "SELECT * FROM " + tableName, "VALUES (1, '1001'), (3, '1003')");
+
+ assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c int WITH (partitioning = 'bucket(2)')");
+ assertUpdate("INSERT INTO " + tableName + " VALUES (6, '1004', 1), (6, '1006', 2)", 2);
+ icebergTable = updateTable(tableName);
+ PartitionTransforms.ColumnTransform columnTransform = PartitionTransforms.getColumnTransform(icebergTable.spec().fields().get(0), INTEGER);
+ BlockBuilder builder = BIGINT.createFixedSizeBlockBuilder(1);
+ List partitions = Arrays.asList(1, 2);
+ partitions.forEach(p -> BIGINT.writeLong(builder, p));
+ Block partitionsBlock = columnTransform.getTransform().apply(builder.build());
+ writeEqualityDeleteToNationTable(icebergTable, ImmutableMap.of("a", 6, "c", 2),
+ ImmutableMap.of("c_bucket", partitionsBlock.getInt(0)));
+ assertQuery(session, "SELECT * FROM " + tableName, "VALUES (1, '1001', NULL), (3, '1003', NULL), (6, '1004', 1)");
+
+ assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN d varchar WITH (partitioning = 'truncate(2)')");
+ assertUpdate("INSERT INTO " + tableName + " VALUES (6, '1004', 1, 'th001'), (6, '1006', 2, 'th002')", 2);
+ icebergTable = updateTable(tableName);
+ writeEqualityDeleteToNationTable(icebergTable, ImmutableMap.of("a", 6, "c", 1),
+ ImmutableMap.of("c_bucket", partitionsBlock.getInt(0), "d_trunc", "th"));
+ testCheckDeleteFiles(icebergTable, 3, ImmutableList.of(EQUALITY_DELETES, EQUALITY_DELETES, EQUALITY_DELETES));
+ assertQuery(session, "SELECT * FROM " + tableName, "VALUES (1, '1001', NULL, NULL), (3, '1003', NULL, NULL), (6, '1004', 1, NULL), (6, '1006', 2, 'th002')");
}
private void testCheckDeleteFiles(Table icebergTable, int expectedSize, List expectedFileContent)
@@ -967,19 +1110,31 @@ private void writePositionDeleteToNationTable(Table icebergTable, String dataFil
private void writeEqualityDeleteToNationTable(Table icebergTable, Map overwriteValues)
throws Exception
+ {
+ writeEqualityDeleteToNationTable(icebergTable, overwriteValues, Collections.emptyMap());
+ }
+
+ private void writeEqualityDeleteToNationTable(Table icebergTable, Map overwriteValues, Map partitionValues)
+ throws Exception
{
File metastoreDir = getDistributedQueryRunner().getCoordinator().getDataDirectory().toFile();
org.apache.hadoop.fs.Path metadataDir = new org.apache.hadoop.fs.Path(metastoreDir.toURI());
String deleteFileName = "delete_file_" + UUID.randomUUID();
FileSystem fs = getHdfsEnvironment().getFileSystem(new HdfsContext(SESSION), metadataDir);
- Schema deleteRowSchema = icebergTable.schema().select("regionkey");
- EqualityDeleteWriter writer = Parquet.writeDeletes(HadoopOutputFile.fromPath(new org.apache.hadoop.fs.Path(metadataDir, deleteFileName), fs))
+ Schema deleteRowSchema = icebergTable.schema().select(overwriteValues.keySet());
+ Parquet.DeleteWriteBuilder writerBuilder = Parquet.writeDeletes(HadoopOutputFile.fromPath(new org.apache.hadoop.fs.Path(metadataDir, deleteFileName), fs))
.forTable(icebergTable)
.rowSchema(deleteRowSchema)
.createWriterFunc(GenericParquetWriter::buildWriter)
- .equalityFieldIds(deleteRowSchema.findField("regionkey").fieldId())
- .overwrite()
- .buildEqualityWriter();
+ .equalityFieldIds(deleteRowSchema.columns().stream().map(Types.NestedField::fieldId).collect(Collectors.toList()))
+ .overwrite();
+
+ if (!partitionValues.isEmpty()) {
+ GenericRecord partitionData = GenericRecord.create(icebergTable.spec().partitionType());
+ writerBuilder.withPartition(partitionData.copy(partitionValues));
+ }
+
+ EqualityDeleteWriter