diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java index 0e562b5d6a24a..7c22eb41c80c6 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java @@ -69,8 +69,6 @@ import org.apache.iceberg.DeleteFiles; import org.apache.iceberg.FileFormat; import org.apache.iceberg.FileMetadata; -import org.apache.iceberg.FileScanTask; -import org.apache.iceberg.ManifestFile; import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.PartitionSpecParser; @@ -79,16 +77,12 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.SchemaParser; import org.apache.iceberg.Table; -import org.apache.iceberg.TableScan; import org.apache.iceberg.Transaction; -import org.apache.iceberg.expressions.Expression; -import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.CharSequenceSet; -import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -101,7 +95,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import static com.facebook.presto.expressions.LogicalRowExpressions.TRUE_CONSTANT; @@ -131,6 +124,7 @@ import static com.facebook.presto.iceberg.IcebergUtil.getDeleteMode; import static com.facebook.presto.iceberg.IcebergUtil.getFileFormat; import static com.facebook.presto.iceberg.IcebergUtil.getPartitionKeyColumnHandles; +import static com.facebook.presto.iceberg.IcebergUtil.getPartitionSpecsIncludingValidData; import static com.facebook.presto.iceberg.IcebergUtil.getPartitions; import static com.facebook.presto.iceberg.IcebergUtil.getSnapshotIdAsOfTime; import static com.facebook.presto.iceberg.IcebergUtil.getTableComment; @@ -148,7 +142,6 @@ import static com.facebook.presto.iceberg.TypeConverter.toPrestoType; import static com.facebook.presto.iceberg.changelog.ChangelogUtil.getRowTypeFromColumnMeta; import static com.facebook.presto.iceberg.optimizer.IcebergPlanOptimizer.getEnforcedColumns; -import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; import static com.facebook.presto.spi.statistics.TableStatisticType.ROW_COUNT; import static com.google.common.base.Verify.verify; @@ -159,6 +152,9 @@ import static java.util.Collections.singletonList; import static java.util.Objects.requireNonNull; import static org.apache.iceberg.MetadataColumns.ROW_POSITION; +import static org.apache.iceberg.SnapshotSummary.DELETED_RECORDS_PROP; +import static org.apache.iceberg.SnapshotSummary.REMOVED_EQ_DELETES_PROP; +import static org.apache.iceberg.SnapshotSummary.REMOVED_POS_DELETES_PROP; public abstract class IcebergAbstractMetadata implements ConnectorMetadata @@ -769,12 +765,7 @@ public void truncateTable(ConnectorSession session, ConnectorTableHandle tableHa { IcebergTableHandle handle = (IcebergTableHandle) tableHandle; Table icebergTable = getIcebergTable(session, handle.getSchemaTableName()); - try (CloseableIterable files = icebergTable.newScan().planFiles()) { - removeScanFiles(icebergTable, files); - } - catch (IOException e) { - throw new PrestoException(GENERIC_INTERNAL_ERROR, "failed to scan files for delete", e); - } + removeScanFiles(icebergTable, TupleDomain.all()); } @Override @@ -883,11 +874,7 @@ public boolean supportsMetadataDelete(ConnectorSession session, ConnectorTableHa // Get partition specs that really need to be checked Table icebergTable = getIcebergTable(session, handle.getSchemaTableName()); - Set partitionSpecIds = handle.getIcebergTableName().getSnapshotId().map( - snapshot -> icebergTable.snapshot(snapshot).allManifests(icebergTable.io()).stream() - .map(ManifestFile::partitionSpecId) - .collect(toImmutableSet())) - .orElseGet(() -> ImmutableSet.copyOf(icebergTable.specs().keySet())); // No snapshot, so no data. This case doesn't matter. + Set partitionSpecIds = getPartitionSpecsIncludingValidData(icebergTable, handle.getIcebergTableName().getSnapshotId()); Set enforcedColumnIds = getEnforcedColumns(icebergTable, partitionSpecIds, domainPredicate, session) .stream() @@ -916,39 +903,29 @@ public OptionalLong metadataDelete(ConnectorSession session, ConnectorTableHandl throw new TableNotFoundException(handle.getSchemaTableName()); } - TableScan scan = icebergTable.newScan(); TupleDomain domainPredicate = layoutHandle.getValidPredicate(); - - if (!domainPredicate.isAll()) { - Expression filterExpression = toIcebergExpression(domainPredicate); - scan = scan.filter(filterExpression); - } - - try (CloseableIterable files = scan.planFiles()) { - return OptionalLong.of(removeScanFiles(icebergTable, files)); - } - catch (IOException e) { - throw new PrestoException(GENERIC_INTERNAL_ERROR, "failed to scan files for delete", e); - } + return removeScanFiles(icebergTable, domainPredicate); } /** - * Deletes all the files within a particular scan + * Deletes all the files for a specific predicate * * @return the number of rows deleted from all files */ - private long removeScanFiles(Table icebergTable, Iterable scan) + private OptionalLong removeScanFiles(Table icebergTable, TupleDomain predicate) { transaction = icebergTable.newTransaction(); - DeleteFiles deletes = transaction.newDelete(); - AtomicLong rowsDeleted = new AtomicLong(0L); - scan.forEach(t -> { - deletes.deleteFile(t.file()); - rowsDeleted.addAndGet(t.estimatedRowsCount()); - }); - deletes.commit(); + DeleteFiles deleteFiles = transaction.newDelete() + .deleteFromRowFilter(toIcebergExpression(predicate)); + deleteFiles.commit(); transaction.commitTransaction(); - return rowsDeleted.get(); + + Map summary = icebergTable.currentSnapshot().summary(); + long deletedRecords = Long.parseLong(summary.getOrDefault(DELETED_RECORDS_PROP, "0")); + long removedPositionDeletes = Long.parseLong(summary.getOrDefault(REMOVED_POS_DELETES_PROP, "0")); + long removedEqualityDeletes = Long.parseLong(summary.getOrDefault(REMOVED_EQ_DELETES_PROP, "0")); + // Removed rows count is inaccurate when existing equality delete files + return OptionalLong.of(deletedRecords - removedPositionDeletes - removedEqualityDeletes); } private static long getSnapshotIdForTableVersion(Table table, ConnectorTableVersion tableVersion) diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java index 77d5624e5b9b1..d538776aa6ce8 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java @@ -300,6 +300,15 @@ public static Map getIdentityPartitions(PartitionSpec p return columns.build(); } + public static Set getPartitionSpecsIncludingValidData(Table icebergTable, Optional snapshotId) + { + return snapshotId.map(snapshot -> icebergTable.snapshot(snapshot).allManifests(icebergTable.io()).stream() + .filter(manifestFile -> manifestFile.hasAddedFiles() || manifestFile.hasExistingFiles()) + .map(ManifestFile::partitionSpecId) + .collect(toImmutableSet())) + .orElseGet(() -> ImmutableSet.copyOf(icebergTable.specs().keySet())); // No snapshot, so no data. This case doesn't matter. + } + public static List toHiveColumns(List columns) { return columns.stream() diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergPlanOptimizer.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergPlanOptimizer.java index 0c78350fc1c3a..35f8c6b356bb7 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergPlanOptimizer.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergPlanOptimizer.java @@ -51,7 +51,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; -import org.apache.iceberg.ManifestFile; import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; @@ -71,11 +70,11 @@ import static com.facebook.presto.iceberg.IcebergTableType.DATA; import static com.facebook.presto.iceberg.IcebergUtil.getAdjacentValue; import static com.facebook.presto.iceberg.IcebergUtil.getIcebergTable; +import static com.facebook.presto.iceberg.IcebergUtil.getPartitionSpecsIncludingValidData; import static com.facebook.presto.spi.ConnectorPlanRewriter.rewriteWith; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableMap.toImmutableMap; -import static com.google.common.collect.ImmutableSet.toImmutableSet; import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.toList; @@ -177,12 +176,7 @@ public PlanNode visitFilter(FilterNode filter, RewriteContext context) RowExpression subfieldPredicate = rowExpressionService.getDomainTranslator().toPredicate(subfieldTupleDomain); // Get partition specs that really need to be checked - Set partitionSpecIds = tableHandle.getIcebergTableName().getSnapshotId().map( - snapshot -> icebergTable.snapshot(snapshot).allManifests(icebergTable.io()).stream() - .map(ManifestFile::partitionSpecId) - .collect(toImmutableSet())) - .orElseGet(() -> ImmutableSet.copyOf(icebergTable.specs().keySet())); // No snapshot, so no data. This case doesn't matter. - + Set partitionSpecIds = getPartitionSpecsIncludingValidData(icebergTable, tableHandle.getIcebergTableName().getSnapshotId()); Set enforcedColumns = getEnforcedColumns(icebergTable, partitionSpecIds, entireColumnDomain, diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java index 8a8972fd8b2d2..b08defa50f01b 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java @@ -1588,6 +1588,58 @@ public void testDeleteOnPartitionedV1Table() dropTable(session, tableName); } + @Test(dataProvider = "version_and_mode") + public void testMetadataDeleteOnTableWithUnsupportedSpecsIncludingNoData(String version, String mode) + { + String tableName = "test_empty_partition_spec_table"; + try { + // Create a table with no partition + assertUpdate("CREATE TABLE " + tableName + " (a INTEGER, b VARCHAR) WITH (format_version = '" + version + "', delete_mode = '" + mode + "')"); + + // Do not insert data, and evaluate the partition spec by adding a partition column `c` + assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c INTEGER WITH (partitioning = 'identity')"); + + // Insert data under the new partition spec + assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001', 1), (2, '1002', 2), (3, '1003', 3), (4, '1004', 4)", 4); + + // We can do metadata delete on partition column `c`, because the initial partition spec contains no data + assertUpdate("DELETE FROM " + tableName + " WHERE c in (1, 3)", 2); + assertQuery("SELECT * FROM " + tableName, "VALUES (2, '1002', 2), (4, '1004', 4)"); + } + finally { + dropTable(getSession(), tableName); + } + } + + @Test(dataProvider = "version_and_mode") + public void testMetadataDeleteOnTableWithUnsupportedSpecsWhoseDataAllDeleted(String version, String mode) + { + String errorMessage = "This connector only supports delete where one or more partitions are deleted entirely.*"; + String tableName = "test_data_deleted_partition_spec_table"; + try { + // Create a table with partition column `a`, and insert some data under this partition spec + assertUpdate("CREATE TABLE " + tableName + " (a INTEGER, b VARCHAR) WITH (format_version = '" + version + "', delete_mode = '" + mode + "', partitioning = ARRAY['a'])"); + assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001'), (2, '1002')", 2); + + // Then evaluate the partition spec by adding a partition column `c`, and insert some data under the new partition spec + assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c INTEGER WITH (partitioning = 'identity')"); + assertUpdate("INSERT INTO " + tableName + " VALUES (3, '1003', 3), (4, '1004', 4), (5, '1005', 5)", 3); + + // Do not support metadata delete with filter on column `c`, because we have data with old partition spec + assertQueryFails("DELETE FROM " + tableName + " WHERE c > 3", errorMessage); + + // Do metadata delete on column `a`, because all partition specs contains partition column `a` + assertUpdate("DELETE FROM " + tableName + " WHERE a in (1, 2)", 2); + + // Then we can do metadata delete on column `c`, because the old partition spec contains no data now + assertUpdate("DELETE FROM " + tableName + " WHERE c > 3", 2); + assertQuery("SELECT * FROM " + tableName, "VALUES (3, '1003', 3)"); + } + finally { + dropTable(getSession(), tableName); + } + } + @DataProvider(name = "version_and_mode") public Object[][] versionAndMode() { diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java index f96c395a3534e..6ae0751e8c5ba 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java @@ -54,6 +54,7 @@ import org.apache.iceberg.FileScanTask; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableOperations; @@ -115,6 +116,8 @@ import static com.facebook.presto.tests.sql.TestTable.randomTableSuffix; import static java.lang.String.format; import static java.util.Objects.requireNonNull; +import static org.apache.iceberg.SnapshotSummary.TOTAL_DATA_FILES_PROP; +import static org.apache.iceberg.SnapshotSummary.TOTAL_DELETE_FILES_PROP; import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertTrue; @@ -460,6 +463,35 @@ public void testTruncate() assertUpdate("DROP TABLE test_truncate"); } + @Test + public void testTruncateTableWithDeleteFiles() + { + String tableName = "test_v2_row_delete_" + randomTableSuffix(); + try { + assertUpdate("CREATE TABLE " + tableName + "(a int, b varchar) WITH (format_version = '2', delete_mode = 'merge-on-read')"); + assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001'), (2, '1002'), (3, '1003')", 3); + + // execute row level deletion + assertUpdate("DELETE FROM " + tableName + " WHERE b in ('1002', '1003')", 2); + assertQuery("SELECT * FROM " + tableName, "VALUES (1, '1001')"); + + Table icebergTable = loadTable(tableName); + assertHasDataFiles(icebergTable.currentSnapshot(), 1); + assertHasDeleteFiles(icebergTable.currentSnapshot(), 1); + + // execute truncate table + assertUpdate("TRUNCATE TABLE " + tableName); + assertQuery("SELECT count(*) FROM " + tableName, "VALUES 0"); + + icebergTable = loadTable(tableName); + assertHasDataFiles(icebergTable.currentSnapshot(), 0); + assertHasDeleteFiles(icebergTable.currentSnapshot(), 0); + } + finally { + assertUpdate("DROP TABLE IF EXISTS " + tableName); + } + } + @Override public void testShowColumns() { @@ -1308,6 +1340,148 @@ public void testPartShowStatsWithFilters() assertQuerySucceeds("DROP TABLE showstatsfilters"); } + @Test + public void testMetadataDeleteOnUnPartitionedTableWithDeleteFiles() + { + String tableName = "test_v2_row_delete_" + randomTableSuffix(); + try { + assertUpdate("CREATE TABLE " + tableName + "(a int, b varchar) WITH (format_version = '2', delete_mode = 'merge-on-read')"); + assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001'), (2, '1002'), (3, '1003')", 3); + + // execute row level deletion + assertUpdate("DELETE FROM " + tableName + " WHERE b in ('1002', '1003')", 2); + assertQuery("SELECT * FROM " + tableName, "VALUES (1, '1001')"); + + Table icebergTable = loadTable(tableName); + assertHasDataFiles(icebergTable.currentSnapshot(), 1); + assertHasDeleteFiles(icebergTable.currentSnapshot(), 1); + + // execute whole table metadata deletion + assertUpdate("DELETE FROM " + tableName, 1); + assertQuery("SELECT count(*) FROM " + tableName, "VALUES 0"); + + icebergTable = loadTable(tableName); + assertHasDataFiles(icebergTable.currentSnapshot(), 0); + assertHasDeleteFiles(icebergTable.currentSnapshot(), 0); + } + finally { + assertUpdate("DROP TABLE IF EXISTS " + tableName); + } + } + + @Test + public void testMetadataDeleteOnPartitionedTableWithDeleteFiles() + { + String tableName = "test_v2_row_delete_" + randomTableSuffix(); + try { + assertUpdate("CREATE TABLE " + tableName + "(a int, b varchar) WITH (format_version = '2', delete_mode = 'merge-on-read', partitioning = ARRAY['a'])"); + assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001'), (2, '1002'), (3, '1003')", 3); + + // execute row level deletion + assertUpdate("DELETE FROM " + tableName + " WHERE b in ('1002', '1003')", 2); + assertQuery("SELECT * FROM " + tableName, "VALUES (1, '1001')"); + + Table icebergTable = loadTable(tableName); + assertHasDataFiles(icebergTable.currentSnapshot(), 3); + assertHasDeleteFiles(icebergTable.currentSnapshot(), 2); + + // execute metadata deletion with filter + assertUpdate("DELETE FROM " + tableName + " WHERE a in (2, 3)", 0); + assertQuery("SELECT * FROM " + tableName, "VALUES (1, '1001')"); + + icebergTable = loadTable(tableName); + assertHasDataFiles(icebergTable.currentSnapshot(), 1); + assertHasDeleteFiles(icebergTable.currentSnapshot(), 0); + + // execute whole table metadata deletion + assertUpdate("DELETE FROM " + tableName, 1); + assertQuery("SELECT count(*) FROM " + tableName, "VALUES 0"); + + icebergTable = loadTable(tableName); + assertHasDataFiles(icebergTable.currentSnapshot(), 0); + assertHasDeleteFiles(icebergTable.currentSnapshot(), 0); + } + finally { + assertUpdate("DROP TABLE IF EXISTS " + tableName); + } + } + + @Test + public void testMetadataDeleteOnV2MorTableWithEmptyUnsupportedSpecs() + { + String tableName = "test_empty_partition_spec_table"; + try { + // Create a table with no partition + assertUpdate("CREATE TABLE " + tableName + " (a INTEGER, b VARCHAR) WITH (format_version = '2', delete_mode = 'merge-on-read')"); + + // Do not insert data, and evaluate the partition spec by adding a partition column `c` + assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c INTEGER WITH (partitioning = 'identity')"); + + // Insert data under the new partition spec + assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001', 1), (2, '1002', 2), (3, '1003', 3), (4, '1004', 4)", 4); + + Table icebergTable = loadTable(tableName); + assertHasDataFiles(icebergTable.currentSnapshot(), 4); + assertHasDeleteFiles(icebergTable.currentSnapshot(), 0); + + // Do metadata delete on partition column `c`, because the initial partition spec contains no data + assertUpdate("DELETE FROM " + tableName + " WHERE c in (1, 3)", 2); + assertQuery("SELECT * FROM " + tableName, "VALUES (2, '1002', 2), (4, '1004', 4)"); + + icebergTable = loadTable(tableName); + assertHasDataFiles(icebergTable.currentSnapshot(), 2); + assertHasDeleteFiles(icebergTable.currentSnapshot(), 0); + } + finally { + assertUpdate("DROP TABLE IF EXISTS " + tableName); + } + } + + @Test + public void testMetadataDeleteOnV2MorTableWithUnsupportedSpecsWhoseDataAllDeleted() + { + String tableName = "test_data_deleted_partition_spec_table"; + try { + // Create a table with partition column `a`, and insert some data under this partition spec + assertUpdate("CREATE TABLE " + tableName + " (a INTEGER, b VARCHAR) WITH (format_version = '2', delete_mode = 'merge-on-read', partitioning = ARRAY['a'])"); + assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001'), (2, '1002')", 2); + + Table icebergTable = loadTable(tableName); + assertHasDataFiles(icebergTable.currentSnapshot(), 2); + assertHasDeleteFiles(icebergTable.currentSnapshot(), 0); + + // Evaluate the partition spec by adding a partition column `c`, and insert some data under the new partition spec + assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c INTEGER WITH (partitioning = 'identity')"); + assertUpdate("INSERT INTO " + tableName + " VALUES (3, '1003', 3), (4, '1004', 4), (5, '1005', 5)", 3); + + icebergTable = loadTable(tableName); + assertHasDataFiles(icebergTable.currentSnapshot(), 5); + assertHasDeleteFiles(icebergTable.currentSnapshot(), 0); + + // Execute row level delete with filter on column `c`, because we have data with old partition spec + assertUpdate("DELETE FROM " + tableName + " WHERE c > 3", 2); + icebergTable = loadTable(tableName); + assertHasDataFiles(icebergTable.currentSnapshot(), 5); + assertHasDeleteFiles(icebergTable.currentSnapshot(), 2); + + // Do metadata delete on column `a`, because all partition specs contains partition column `a` + assertUpdate("DELETE FROM " + tableName + " WHERE a in (1, 2)", 2); + icebergTable = loadTable(tableName); + assertHasDataFiles(icebergTable.currentSnapshot(), 3); + assertHasDeleteFiles(icebergTable.currentSnapshot(), 2); + + // Then do metadata delete on column `c`, because the old partition spec contains no data now + assertUpdate("DELETE FROM " + tableName + " WHERE c > 3", 0); + assertQuery("SELECT * FROM " + tableName, "VALUES (3, '1003', 3)"); + icebergTable = loadTable(tableName); + assertHasDataFiles(icebergTable.currentSnapshot(), 1); + assertHasDeleteFiles(icebergTable.currentSnapshot(), 0); + } + finally { + assertUpdate("DROP TABLE IF EXISTS " + tableName); + } + } + private void testCheckDeleteFiles(Table icebergTable, int expectedSize, List expectedFileContent) { // check delete file list @@ -1450,4 +1624,18 @@ private void testWithAllFileFormats(Session session, BiConsumer map = snapshot.summary(); + int totalDataFiles = Integer.valueOf(map.get(TOTAL_DATA_FILES_PROP)); + assertEquals(totalDataFiles, dataFilesCount); + } + + private void assertHasDeleteFiles(Snapshot snapshot, int deleteFilesCount) + { + Map map = snapshot.summary(); + int totalDeleteFiles = Integer.valueOf(map.get(TOTAL_DELETE_FILES_PROP)); + assertEquals(totalDeleteFiles, deleteFilesCount); + } } diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergLogicalPlanner.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergLogicalPlanner.java index 5ac7542f7942d..af5aef01101fc 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergLogicalPlanner.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergLogicalPlanner.java @@ -18,11 +18,13 @@ import com.facebook.presto.common.Subfield; import com.facebook.presto.common.predicate.Domain; import com.facebook.presto.common.predicate.TupleDomain; +import com.facebook.presto.common.predicate.ValueSet; import com.facebook.presto.common.type.TimeZoneKey; import com.facebook.presto.cost.StatsProvider; import com.facebook.presto.metadata.FunctionAndTypeManager; import com.facebook.presto.metadata.Metadata; import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ConnectorId; import com.facebook.presto.spi.ConnectorTableLayoutHandle; import com.facebook.presto.spi.plan.AggregationNode; import com.facebook.presto.spi.plan.FilterNode; @@ -88,6 +90,7 @@ import static com.facebook.presto.iceberg.IcebergQueryRunner.createIcebergQueryRunner; import static com.facebook.presto.iceberg.IcebergSessionProperties.PARQUET_DEREFERENCE_PUSHDOWN_ENABLED; import static com.facebook.presto.iceberg.IcebergSessionProperties.PUSHDOWN_FILTER_ENABLED; +import static com.facebook.presto.iceberg.IcebergSessionProperties.isPushdownFilterEnabled; import static com.facebook.presto.parquet.ParquetTypeUtils.pushdownColumnNameForSubfield; import static com.facebook.presto.spi.relation.SpecialFormExpression.Form.AND; import static com.facebook.presto.spi.relation.SpecialFormExpression.Form.OR; @@ -635,6 +638,96 @@ public void testFiltersWithPushdownDisable() assertUpdate("DROP TABLE test_filters_with_pushdown_disable"); } + @Test + public void testThoroughlyPushdownForTableWithUnsupportedSpecsIncludingNoData() + { + // The filter pushdown session property is disabled by default + Session sessionWithoutFilterPushdown = getQueryRunner().getDefaultSession(); + assertEquals(isPushdownFilterEnabled(sessionWithoutFilterPushdown.toConnectorSession(new ConnectorId(ICEBERG_CATALOG))), false); + + String tableName = "test_empty_partition_spec_table"; + try { + // Create a table with no partition + assertUpdate("CREATE TABLE " + tableName + " (a INTEGER, b VARCHAR) WITH (format_version = '1')"); + + // Do not insert data, and evaluate the partition spec by adding a partition column `c` + assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c INTEGER WITH (partitioning = 'identity')"); + + // Insert data under the new partition spec + assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001', 1), (2, '1002', 2), (3, '1003', 3), (4, '1004', 4)", 4); + + // Only identity partition column predicates, would be enforced totally by tableScan + assertPlan(sessionWithoutFilterPushdown, "SELECT a, b FROM " + tableName + " WHERE c > 2", + output(exchange( + strictTableScan(tableName, identityMap("a", "b")))), + plan -> assertTableLayout( + plan, + tableName, + withColumnDomains(ImmutableMap.of(new Subfield( + "c", + ImmutableList.of()), + Domain.create(ValueSet.ofRanges(greaterThan(INTEGER, 2L)), false))), + TRUE_CONSTANT, + ImmutableSet.of("c"))); + assertQuery("SELECT * FROM " + tableName, "VALUES (1, '1001', 1), (2, '1002', 2), (3, '1003', 3), (4, '1004', 4)"); + } + finally { + assertUpdate("DROP TABLE " + tableName); + } + } + + @Test + public void testThoroughlyPushdownForTableWithUnsupportedSpecsWhoseDataAllDeleted() + { + // The filter pushdown session property is disabled by default + Session sessionWithoutFilterPushdown = getQueryRunner().getDefaultSession(); + assertEquals(isPushdownFilterEnabled(sessionWithoutFilterPushdown.toConnectorSession(new ConnectorId(ICEBERG_CATALOG))), false); + + String tableName = "test_data_deleted_partition_spec_table"; + try { + // Create a table with partition column `a`, and insert some data under this partition spec + assertUpdate("CREATE TABLE " + tableName + " (a INTEGER, b VARCHAR) WITH (format_version = '1', partitioning = ARRAY['a'])"); + assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001'), (2, '1002')", 2); + + // Then evaluate the partition spec by adding a partition column `c`, and insert some data under the new partition spec + assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c INTEGER WITH (partitioning = 'identity')"); + assertUpdate("INSERT INTO " + tableName + " VALUES (3, '1003', 3), (4, '1004', 4), (5, '1005', 5)", 3); + + // The predicate was enforced partially by tableScan, filter on `c` could not be thoroughly pushed down, so the filterNode drop it's filter condition `a > 2` + assertPlan(sessionWithoutFilterPushdown, "SELECT b FROM " + tableName + " WHERE a > 2 and c = 4", + output(exchange(project( + filter("c = 4", + strictTableScan(tableName, identityMap("b", "c"))))))); + assertQuery("SELECT * FROM " + tableName, "VALUES (1, '1001', NULL), (2, '1002', NULL), (3, '1003', 3), (4, '1004', 4), (5, '1005', 5)"); + + // Do metadata delete on column `a`, because all partition specs contains partition column `a` + assertUpdate("DELETE FROM " + tableName + " WHERE a IN (1, 2)", 2); + + // Only identity partition column predicates, would be enforced totally by tableScan + assertPlan(sessionWithoutFilterPushdown, "SELECT b FROM " + tableName + " WHERE a > 2 and c = 4", + output(exchange( + strictTableScan(tableName, identityMap("b")))), + plan -> assertTableLayout( + plan, + tableName, + withColumnDomains(ImmutableMap.of( + new Subfield( + "a", + ImmutableList.of()), + Domain.create(ValueSet.ofRanges(greaterThan(INTEGER, 2L)), false), + new Subfield( + "c", + ImmutableList.of()), + singleValue(INTEGER, 4L))), + TRUE_CONSTANT, + ImmutableSet.of("a", "c"))); + assertQuery("SELECT * FROM " + tableName, "VALUES (3, '1003', 3), (4, '1004', 4), (5, '1005', 5)"); + } + finally { + assertUpdate("DROP TABLE " + tableName); + } + } + @DataProvider(name = "timezones") public Object[][] timezones() { diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/TestIcebergSmokeRest.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/TestIcebergSmokeRest.java index 3298c57ce0756..65c34bc31bbee 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/TestIcebergSmokeRest.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/TestIcebergSmokeRest.java @@ -164,4 +164,32 @@ public void testMetadataDeleteOnNonIdentityPartitionColumn(String version, Strin super.testMetadataDeleteOnNonIdentityPartitionColumn(version, mode); } } + + @Test(dataProvider = "version_and_mode") + public void testMetadataDeleteOnTableWithUnsupportedSpecsIncludingNoData(String version, String mode) + { + if (version.equals("1")) { + // v1 table create fails due to Iceberg REST catalog bug (see: https://github.com/apache/iceberg/issues/8756) + assertThatThrownBy(() -> super.testMetadataDeleteOnTableWithUnsupportedSpecsIncludingNoData(version, mode)) + .isInstanceOf(RuntimeException.class); + } + else { + // v2 succeeds + super.testMetadataDeleteOnTableWithUnsupportedSpecsIncludingNoData(version, mode); + } + } + + @Test(dataProvider = "version_and_mode") + public void testMetadataDeleteOnTableWithUnsupportedSpecsWhoseDataAllDeleted(String version, String mode) + { + if (version.equals("1")) { + // v1 table create fails due to Iceberg REST catalog bug (see: https://github.com/apache/iceberg/issues/8756) + assertThatThrownBy(() -> super.testMetadataDeleteOnTableWithUnsupportedSpecsWhoseDataAllDeleted(version, mode)) + .isInstanceOf(RuntimeException.class); + } + else { + // v2 succeeds + super.testMetadataDeleteOnTableWithUnsupportedSpecsWhoseDataAllDeleted(version, mode); + } + } }