diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index e1a07b80c6b2..3cd6a47ac8fb 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -895,8 +895,7 @@ private Optional getTableHandleForOptimize(Connecto getFileFormat(icebergTable), icebergTable.properties(), maxScannedFileSize, - retryMode != NO_RETRIES, - tableHandle.getEnforcedPredicate().isAll()), + retryMode != NO_RETRIES), icebergTable.location())); } @@ -1016,7 +1015,7 @@ private void finishOptimize(ConnectorSession session, IcebergTableExecuteHandle ImmutableSet.Builder scannedDeleteFilesBuilder = ImmutableSet.builder(); splitSourceInfo.stream().map(DataFileWithDeleteFiles.class::cast).forEach(dataFileWithDeleteFiles -> { scannedDataFilesBuilder.add(dataFileWithDeleteFiles.getDataFile()); - scannedDeleteFilesBuilder.addAll(filterDeleteFilesThatWereNotScannedDuringOptimize(dataFileWithDeleteFiles.getDeleteFiles(), optimizeHandle.isWholeTableScan())); + scannedDeleteFilesBuilder.addAll(dataFileWithDeleteFiles.getDeleteFiles()); }); Set scannedDataFiles = scannedDataFilesBuilder.build(); @@ -1073,20 +1072,6 @@ private void finishOptimize(ConnectorSession session, IcebergTableExecuteHandle transaction = null; } - private static Set filterDeleteFilesThatWereNotScannedDuringOptimize(List deleteFiles, boolean isWholeTableScan) - { - // if whole table was scanned all delete files were read and applied - // so it is safe to remove them - if (isWholeTableScan) { - return ImmutableSet.copyOf(deleteFiles); - } - return deleteFiles.stream() - // equality delete files can be global so we can't clean them up unless we optimize whole table - // position delete files cannot be global so it is safe to clean them if they were scanned - .filter(deleteFile -> (deleteFile.content() == POSITION_DELETES) || deleteFile.partition() != null) - .collect(toImmutableSet()); - } - @Override public void executeTableExecute(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle) { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java index 5b7834d5e1cc..cd8aa2f04f59 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java @@ -235,7 +235,10 @@ public CompletableFuture getNextBatch(int maxSize) continue; } if (recordScannedFiles) { - scannedFiles.add(new DataFileWithDeleteFiles(scanTask.file(), scanTask.deletes())); + // Positional and Equality deletes can only be cleaned up if the whole table has been optimized. + // Equality deletes may apply to many files, and position deletes may be grouped together. This makes it difficult to know if they are obsolete. + List fullyAppliedDeletes = tableHandle.getEnforcedPredicate().isAll() ? scanTask.deletes() : ImmutableList.of(); + scannedFiles.add(new DataFileWithDeleteFiles(scanTask.file(), fullyAppliedDeletes)); } splits.add(icebergSplit); } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergOptimizeHandle.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergOptimizeHandle.java index 9787358b3954..31a94b32730b 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergOptimizeHandle.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergOptimizeHandle.java @@ -38,7 +38,6 @@ public class IcebergOptimizeHandle private final Map tableStorageProperties; private final DataSize maxScannedFileSize; private final boolean retriesEnabled; - private final boolean wholeTableScan; @JsonCreator public IcebergOptimizeHandle( @@ -49,8 +48,7 @@ public IcebergOptimizeHandle( IcebergFileFormat fileFormat, Map tableStorageProperties, DataSize maxScannedFileSize, - boolean retriesEnabled, - boolean wholeTableScan) + boolean retriesEnabled) { this.snapshotId = snapshotId; this.schemaAsJson = requireNonNull(schemaAsJson, "schemaAsJson is null"); @@ -60,7 +58,6 @@ public IcebergOptimizeHandle( this.tableStorageProperties = ImmutableMap.copyOf(requireNonNull(tableStorageProperties, "tableStorageProperties is null")); this.maxScannedFileSize = requireNonNull(maxScannedFileSize, "maxScannedFileSize is null"); this.retriesEnabled = retriesEnabled; - this.wholeTableScan = wholeTableScan; } @JsonProperty @@ -111,12 +108,6 @@ public boolean isRetriesEnabled() return retriesEnabled; } - @JsonProperty - public boolean isWholeTableScan() - { - return wholeTableScan; - } - @Override public String toString() { @@ -129,7 +120,6 @@ public String toString() .add("tableStorageProperties", tableStorageProperties) .add("maxScannedFileSize", maxScannedFileSize) .add("retriesEnabled", retriesEnabled) - .add("tupleDomainAll", wholeTableScan) .toString(); } } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java index 39fea28b109a..86ccbf15e6a8 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java @@ -223,6 +223,32 @@ public void testOptimizingV2TableDoesntRemoveEqualityDeletesWhenOnlyPartOfTheTab Assertions.assertThat(updatedFiles).doesNotContain(initialActiveFiles.stream().filter(path -> !path.contains("regionkey=1")).toArray(String[]::new)); } + @Test + public void testSelectivelyOptimizingLeavesEqualityDeletes() + throws Exception + { + String tableName = "test_selectively_optimizing_leaves_eq_deletes_" + randomTableSuffix(); + assertUpdate("CREATE TABLE " + tableName + " WITH (partitioning = ARRAY['nationkey']) AS SELECT * FROM tpch.tiny.nation", 25); + Table icebergTable = loadTable(tableName); + writeEqualityDeleteToNationTable(icebergTable, Optional.of(icebergTable.spec()), Optional.of(new PartitionData(new Long[]{1L}))); + query("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE WHERE nationkey < 5"); + assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation WHERE regionkey != 1 OR nationkey != 1"); + Assertions.assertThat(loadTable(tableName).currentSnapshot().summary().get("total-equality-deletes")).isEqualTo("1"); + } + + @Test + public void testOptimizingWholeTableRemovesEqualityDeletes() + throws Exception + { + String tableName = "test_optimizing_whole_table_removes_eq_deletes_" + randomTableSuffix(); + assertUpdate("CREATE TABLE " + tableName + " WITH (partitioning = ARRAY['nationkey']) AS SELECT * FROM tpch.tiny.nation", 25); + Table icebergTable = loadTable(tableName); + writeEqualityDeleteToNationTable(icebergTable, Optional.of(icebergTable.spec()), Optional.of(new PartitionData(new Long[]{1L}))); + query("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE"); + assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation WHERE regionkey != 1 OR nationkey != 1"); + Assertions.assertThat(loadTable(tableName).currentSnapshot().summary().get("total-equality-deletes")).isEqualTo("0"); + } + @Test public void testOptimizingV2TableWithEmptyPartitionSpec() throws Exception