Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -895,8 +895,7 @@ private Optional<ConnectorTableExecuteHandle> getTableHandleForOptimize(Connecto
getFileFormat(icebergTable),
icebergTable.properties(),
maxScannedFileSize,
retryMode != NO_RETRIES,
tableHandle.getEnforcedPredicate().isAll()),
retryMode != NO_RETRIES),
icebergTable.location()));
}

Expand Down Expand Up @@ -1016,7 +1015,7 @@ private void finishOptimize(ConnectorSession session, IcebergTableExecuteHandle
ImmutableSet.Builder<DeleteFile> scannedDeleteFilesBuilder = ImmutableSet.builder();
splitSourceInfo.stream().map(DataFileWithDeleteFiles.class::cast).forEach(dataFileWithDeleteFiles -> {
scannedDataFilesBuilder.add(dataFileWithDeleteFiles.getDataFile());
scannedDeleteFilesBuilder.addAll(filterDeleteFilesThatWereNotScannedDuringOptimize(dataFileWithDeleteFiles.getDeleteFiles(), optimizeHandle.isWholeTableScan()));
scannedDeleteFilesBuilder.addAll(dataFileWithDeleteFiles.getDeleteFiles());
});

Set<DataFile> scannedDataFiles = scannedDataFilesBuilder.build();
Expand Down Expand Up @@ -1073,20 +1072,6 @@ private void finishOptimize(ConnectorSession session, IcebergTableExecuteHandle
transaction = null;
}

private static Set<DeleteFile> filterDeleteFilesThatWereNotScannedDuringOptimize(List<DeleteFile> deleteFiles, boolean isWholeTableScan)
{
// if whole table was scanned all delete files were read and applied
// so it is safe to remove them
if (isWholeTableScan) {
return ImmutableSet.copyOf(deleteFiles);
}
return deleteFiles.stream()
// equality delete files can be global so we can't clean them up unless we optimize whole table
// position delete files cannot be global so it is safe to clean them if they were scanned
.filter(deleteFile -> (deleteFile.content() == POSITION_DELETES) || deleteFile.partition() != null)
.collect(toImmutableSet());
}

@Override
public void executeTableExecute(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,10 @@ public CompletableFuture<ConnectorSplitBatch> getNextBatch(int maxSize)
continue;
}
if (recordScannedFiles) {
scannedFiles.add(new DataFileWithDeleteFiles(scanTask.file(), scanTask.deletes()));
// Positional and Equality deletes can only be cleaned up if the whole table has been optimized.
// Equality deletes may apply to many files, and position deletes may be grouped together. This makes it difficult to know if they are obsolete.
List<org.apache.iceberg.DeleteFile> fullyAppliedDeletes = tableHandle.getEnforcedPredicate().isAll() ? scanTask.deletes() : ImmutableList.of();
scannedFiles.add(new DataFileWithDeleteFiles(scanTask.file(), fullyAppliedDeletes));
}
splits.add(icebergSplit);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public class IcebergOptimizeHandle
private final Map<String, String> tableStorageProperties;
private final DataSize maxScannedFileSize;
private final boolean retriesEnabled;
private final boolean wholeTableScan;

@JsonCreator
public IcebergOptimizeHandle(
Expand All @@ -49,8 +48,7 @@ public IcebergOptimizeHandle(
IcebergFileFormat fileFormat,
Map<String, String> tableStorageProperties,
DataSize maxScannedFileSize,
boolean retriesEnabled,
boolean wholeTableScan)
boolean retriesEnabled)
{
this.snapshotId = snapshotId;
this.schemaAsJson = requireNonNull(schemaAsJson, "schemaAsJson is null");
Expand All @@ -60,7 +58,6 @@ public IcebergOptimizeHandle(
this.tableStorageProperties = ImmutableMap.copyOf(requireNonNull(tableStorageProperties, "tableStorageProperties is null"));
this.maxScannedFileSize = requireNonNull(maxScannedFileSize, "maxScannedFileSize is null");
this.retriesEnabled = retriesEnabled;
this.wholeTableScan = wholeTableScan;
}

@JsonProperty
Expand Down Expand Up @@ -111,12 +108,6 @@ public boolean isRetriesEnabled()
return retriesEnabled;
}

@JsonProperty
public boolean isWholeTableScan()
{
return wholeTableScan;
}

@Override
public String toString()
{
Expand All @@ -129,7 +120,6 @@ public String toString()
.add("tableStorageProperties", tableStorageProperties)
.add("maxScannedFileSize", maxScannedFileSize)
.add("retriesEnabled", retriesEnabled)
.add("tupleDomainAll", wholeTableScan)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,32 @@ public void testOptimizingV2TableDoesntRemoveEqualityDeletesWhenOnlyPartOfTheTab
Assertions.assertThat(updatedFiles).doesNotContain(initialActiveFiles.stream().filter(path -> !path.contains("regionkey=1")).toArray(String[]::new));
}

@Test
public void testSelectivelyOptimizingLeavesEqualityDeletes()
throws Exception
{
String tableName = "test_selectively_optimizing_leaves_eq_deletes_" + randomTableSuffix();
assertUpdate("CREATE TABLE " + tableName + " WITH (partitioning = ARRAY['nationkey']) AS SELECT * FROM tpch.tiny.nation", 25);
Table icebergTable = loadTable(tableName);
writeEqualityDeleteToNationTable(icebergTable, Optional.of(icebergTable.spec()), Optional.of(new PartitionData(new Long[]{1L})));
query("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE WHERE nationkey < 5");
assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation WHERE regionkey != 1 OR nationkey != 1");
Assertions.assertThat(loadTable(tableName).currentSnapshot().summary().get("total-equality-deletes")).isEqualTo("1");
}

@Test
public void testOptimizingWholeTableRemovesEqualityDeletes()
throws Exception
{
String tableName = "test_optimizing_whole_table_removes_eq_deletes_" + randomTableSuffix();
assertUpdate("CREATE TABLE " + tableName + " WITH (partitioning = ARRAY['nationkey']) AS SELECT * FROM tpch.tiny.nation", 25);
Table icebergTable = loadTable(tableName);
writeEqualityDeleteToNationTable(icebergTable, Optional.of(icebergTable.spec()), Optional.of(new PartitionData(new Long[]{1L})));
query("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");
assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation WHERE regionkey != 1 OR nationkey != 1");
Assertions.assertThat(loadTable(tableName).currentSnapshot().summary().get("total-equality-deletes")).isEqualTo("0");
}

@Test
public void testOptimizingV2TableWithEmptyPartitionSpec()
throws Exception
Expand Down