diff --git a/core/src/main/java/org/apache/iceberg/deletes/SortingPositionOnlyDeleteWriter.java b/core/src/main/java/org/apache/iceberg/deletes/SortingPositionOnlyDeleteWriter.java index 818529c02479..ba954577aa74 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/SortingPositionOnlyDeleteWriter.java +++ b/core/src/main/java/org/apache/iceberg/deletes/SortingPositionOnlyDeleteWriter.java @@ -167,14 +167,10 @@ private DeleteWriteResult writeDeletes(Collection paths) throws IO private void validatePreviousDeletes(PositionDeleteIndex index) { Preconditions.checkArgument( - index.deleteFiles().stream().allMatch(this::isFileScoped), + index.deleteFiles().stream().allMatch(ContentFileUtil::isFileScoped), "Previous deletes must be file-scoped"); } - private boolean isFileScoped(DeleteFile deleteFile) { - return ContentFileUtil.referencedDataFile(deleteFile) != null; - } - private Collection sort(Collection paths) { if (paths.size() <= 1) { return paths; diff --git a/core/src/main/java/org/apache/iceberg/util/ContentFileUtil.java b/core/src/main/java/org/apache/iceberg/util/ContentFileUtil.java index e4666bd1bd8f..9e4a65be02ae 100644 --- a/core/src/main/java/org/apache/iceberg/util/ContentFileUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/ContentFileUtil.java @@ -86,6 +86,10 @@ public static String referencedDataFileLocation(DeleteFile deleteFile) { return location != null ? location.toString() : null; } + public static boolean isFileScoped(DeleteFile deleteFile) { + return referencedDataFile(deleteFile) != null; + } + public static boolean isDV(DeleteFile deleteFile) { return deleteFile.format() == FileFormat.PUFFIN; } diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java index 589543f0375a..60941b8d5560 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java @@ -39,9 +39,11 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkSQLProperties; +import org.apache.iceberg.spark.source.SimpleRecord; import org.apache.iceberg.spark.source.SparkTable; import org.apache.iceberg.spark.source.TestSparkCatalog; import org.apache.iceberg.util.SnapshotUtil; +import org.apache.spark.sql.Encoders; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.apache.spark.sql.connector.catalog.Identifier; import org.junit.jupiter.api.BeforeEach; @@ -99,6 +101,87 @@ public void testDeletePartitionGranularity() throws NoSuchTableException { checkDeleteFileGranularity(DeleteGranularity.PARTITION); } + @TestTemplate + public void testPositionDeletesAreMaintainedDuringDelete() throws NoSuchTableException { + sql( + "CREATE TABLE %s (id int, data string) USING iceberg PARTITIONED BY (id) TBLPROPERTIES" + + "('%s'='%s', '%s'='%s', '%s'='%s')", + tableName, + TableProperties.FORMAT_VERSION, + 2, + TableProperties.DELETE_MODE, + "merge-on-read", + TableProperties.DELETE_GRANULARITY, + "file"); + createBranchIfNeeded(); + + List records = + Lists.newArrayList( + new SimpleRecord(1, "a"), + new SimpleRecord(1, "b"), + new SimpleRecord(1, "c"), + new SimpleRecord(2, "d"), + new SimpleRecord(2, "e")); + spark + .createDataset(records, Encoders.bean(SimpleRecord.class)) + .coalesce(1) + .writeTo(commitTarget()) + .append(); + + sql("DELETE FROM %s WHERE id = 1 and data='a'", commitTarget()); + sql("DELETE FROM %s WHERE id = 2 and data='d'", commitTarget()); + sql("DELETE FROM %s WHERE id = 1 and data='c'", commitTarget()); + + Table table = validationCatalog.loadTable(tableIdent); + Snapshot latest = SnapshotUtil.latestSnapshot(table, branch); + assertThat(latest.removedDeleteFiles(table.io())).hasSize(1); + assertEquals( + "Should have expected rows", + ImmutableList.of(row(1, "b"), row(2, "e")), + sql("SELECT * FROM %s ORDER BY id ASC", selectTarget())); + } + + @TestTemplate + public void testUnpartitionedPositionDeletesAreMaintainedDuringDelete() + throws NoSuchTableException { + sql( + "CREATE TABLE %s (id int, data string) USING iceberg TBLPROPERTIES" + + "('%s'='%s', '%s'='%s', '%s'='%s')", + tableName, + TableProperties.FORMAT_VERSION, + 2, + TableProperties.DELETE_MODE, + "merge-on-read", + TableProperties.DELETE_GRANULARITY, + "file"); + createBranchIfNeeded(); + + List records = + Lists.newArrayList( + new SimpleRecord(1, "a"), + new SimpleRecord(1, "b"), + new SimpleRecord(1, "c"), + new SimpleRecord(2, "d"), + new SimpleRecord(2, "e")); + spark + .createDataset(records, Encoders.bean(SimpleRecord.class)) + .coalesce(1) + .writeTo(commitTarget()) + .append(); + + sql("DELETE FROM %s WHERE id = 1 and data='a'", commitTarget()); + sql("DELETE FROM %s WHERE id = 2 and data='d'", commitTarget()); + sql("DELETE FROM %s WHERE id = 1 and data='c'", commitTarget()); + + Table table = validationCatalog.loadTable(tableIdent); + Snapshot latest = SnapshotUtil.latestSnapshot(table, branch); + assertThat(latest.removedDeleteFiles(table.io())).hasSize(1); + assertEquals( + "Should have expected rows", + ImmutableList.of(row(1, "b"), row(2, "e")), + sql("SELECT * FROM %s ORDER BY id ASC", selectTarget())); + } + private void checkDeleteFileGranularity(DeleteGranularity deleteGranularity) throws NoSuchTableException { createAndInitPartitionedTable(); diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java index 391fae4ea696..e9cc9d8541ad 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java @@ -19,6 +19,7 @@ package org.apache.iceberg.spark.extensions; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assumptions.assumeThat; import java.util.Map; import org.apache.iceberg.ParameterizedTestExtension; @@ -55,19 +56,82 @@ public void testUpdatePartitionGranularity() { checkUpdateFileGranularity(DeleteGranularity.PARTITION); } - private void checkUpdateFileGranularity(DeleteGranularity deleteGranularity) { - createAndInitTable("id INT, dep STRING", "PARTITIONED BY (dep)", null /* empty */); + @TestTemplate + public void testUpdateFileGranularityMergesDeleteFiles() { + // Range distribution will produce partition scoped deletes which will not be cleaned up + assumeThat(distributionMode).isNotEqualToIgnoringCase("range"); - sql( - "ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s')", - tableName, TableProperties.DELETE_GRANULARITY, deleteGranularity); + checkUpdateFileGranularity(DeleteGranularity.FILE); + sql("UPDATE %s SET id = id + 1 WHERE id = 4", commitTarget()); + Table table = validationCatalog.loadTable(tableIdent); + Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch); + String expectedDeleteFilesCount = "2"; + validateMergeOnRead(currentSnapshot, "2", expectedDeleteFilesCount, "2"); - append(tableName, "{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 2, \"dep\": \"hr\" }"); - append(tableName, "{ \"id\": 3, \"dep\": \"hr\" }\n" + "{ \"id\": 4, \"dep\": \"hr\" }"); - append(tableName, "{ \"id\": 1, \"dep\": \"it\" }\n" + "{ \"id\": 2, \"dep\": \"it\" }"); - append(tableName, "{ \"id\": 3, \"dep\": \"it\" }\n" + "{ \"id\": 4, \"dep\": \"it\" }"); + assertThat(currentSnapshot.removedDeleteFiles(table.io())).hasSize(2); + assertEquals( + "Should have expected rows", + ImmutableList.of( + row(0, "hr"), + row(2, "hr"), + row(2, "hr"), + row(5, "hr"), + row(0, "it"), + row(2, "it"), + row(2, "it"), + row(5, "it")), + sql("SELECT * FROM %s ORDER BY dep ASC, id ASC", selectTarget())); + } - createBranchIfNeeded(); + @TestTemplate + public void testUpdateUnpartitionedFileGranularityMergesDeleteFiles() { + // Range distribution will produce partition scoped deletes which will not be cleaned up + assumeThat(distributionMode).isNotEqualToIgnoringCase("range"); + initTable("", DeleteGranularity.FILE); + + sql("UPDATE %s SET id = id - 1 WHERE id = 1 OR id = 3", commitTarget()); + + Table table = validationCatalog.loadTable(tableIdent); + assertThat(table.snapshots()).hasSize(5); + Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch); + String expectedDeleteFilesCount = "4"; + validateMergeOnRead(currentSnapshot, "1", expectedDeleteFilesCount, "1"); + assertEquals( + "Should have expected rows", + ImmutableList.of( + row(0, "hr"), + row(2, "hr"), + row(2, "hr"), + row(4, "hr"), + row(0, "it"), + row(2, "it"), + row(2, "it"), + row(4, "it")), + sql("SELECT * FROM %s ORDER BY dep ASC, id ASC", selectTarget())); + + sql("UPDATE %s SET id = id + 1 WHERE id = 4", commitTarget()); + table.refresh(); + currentSnapshot = SnapshotUtil.latestSnapshot(table, branch); + expectedDeleteFilesCount = "2"; + + validateMergeOnRead(currentSnapshot, "1", expectedDeleteFilesCount, "1"); + assertThat(currentSnapshot.removedDeleteFiles(table.io())).hasSize(2); + assertEquals( + "Should have expected rows", + ImmutableList.of( + row(0, "hr"), + row(2, "hr"), + row(2, "hr"), + row(5, "hr"), + row(0, "it"), + row(2, "it"), + row(2, "it"), + row(5, "it")), + sql("SELECT * FROM %s ORDER BY dep ASC, id ASC", selectTarget())); + } + + private void checkUpdateFileGranularity(DeleteGranularity deleteGranularity) { + initTable("PARTITIONED BY (dep)", deleteGranularity); sql("UPDATE %s SET id = id - 1 WHERE id = 1 OR id = 3", commitTarget()); @@ -91,4 +155,19 @@ private void checkUpdateFileGranularity(DeleteGranularity deleteGranularity) { row(4, "it")), sql("SELECT * FROM %s ORDER BY dep ASC, id ASC", selectTarget())); } + + private void initTable(String partitionedBy, DeleteGranularity deleteGranularity) { + createAndInitTable("id INT, dep STRING", partitionedBy, null /* empty */); + + sql( + "ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s')", + tableName, TableProperties.DELETE_GRANULARITY, deleteGranularity); + + append(tableName, "{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 2, \"dep\": \"hr\" }"); + append(tableName, "{ \"id\": 3, \"dep\": \"hr\" }\n" + "{ \"id\": 4, \"dep\": \"hr\" }"); + append(tableName, "{ \"id\": 1, \"dep\": \"it\" }\n" + "{ \"id\": 2, \"dep\": \"it\" }"); + append(tableName, "{ \"id\": 3, \"dep\": \"it\" }\n" + "{ \"id\": 4, \"dep\": \"it\" }"); + + createBranchIfNeeded(); + } } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java index 66cda5b82955..18e483f23fc6 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java @@ -24,6 +24,8 @@ import java.util.Set; import java.util.function.Supplier; import java.util.stream.Collectors; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileScanTask; import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionScanTask; import org.apache.iceberg.PartitionSpec; @@ -48,6 +50,8 @@ import org.apache.iceberg.spark.SparkReadConf; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.SparkV2Filters; +import org.apache.iceberg.util.ContentFileUtil; +import org.apache.iceberg.util.DeleteFileSet; import org.apache.iceberg.util.SnapshotUtil; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.connector.expressions.NamedReference; @@ -158,6 +162,23 @@ public void filter(Predicate[] predicates) { } } + protected Map rewritableDeletes() { + Map rewritableDeletes = Maps.newHashMap(); + + for (ScanTask task : tasks()) { + FileScanTask fileScanTask = task.asFileScanTask(); + for (DeleteFile deleteFile : fileScanTask.deletes()) { + if (ContentFileUtil.isFileScoped(deleteFile)) { + rewritableDeletes + .computeIfAbsent(fileScanTask.file().location(), ignored -> DeleteFileSet.create()) + .add(deleteFile); + } + } + } + + return rewritableDeletes; + } + // at this moment, Spark can only pass IN filters for a single attribute // if there are multiple filter attributes, Spark will pass two separate IN filters private Expression convertRuntimeFilters(Predicate[] predicates) { diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java index 02c87b53e762..18020ee935b6 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java @@ -28,6 +28,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.function.Function; import org.apache.iceberg.ContentFile; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; @@ -42,8 +43,12 @@ import org.apache.iceberg.SnapshotSummary; import org.apache.iceberg.SnapshotUpdate; import org.apache.iceberg.Table; +import org.apache.iceberg.data.BaseDeleteLoader; +import org.apache.iceberg.data.DeleteLoader; import org.apache.iceberg.deletes.DeleteGranularity; import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.deletes.PositionDeleteIndex; +import org.apache.iceberg.encryption.EncryptingFileIO; import org.apache.iceberg.exceptions.CleanableFailure; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; @@ -67,6 +72,7 @@ import org.apache.iceberg.spark.SparkWriteRequirements; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.CharSequenceSet; +import org.apache.iceberg.util.DeleteFileSet; import org.apache.iceberg.util.StructProjection; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.broadcast.Broadcast; @@ -166,10 +172,23 @@ private class PositionDeltaBatchWrite implements DeltaBatchWrite { @Override public DeltaWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) { - // broadcast the table metadata as the writer factory will be sent to executors - Broadcast tableBroadcast = - sparkContext.broadcast(SerializableTableWithSize.copyOf(table)); - return new PositionDeltaWriteFactory(tableBroadcast, command, context, writeProperties); + // broadcast large objects since the writer factory will be sent to executors + return new PositionDeltaWriteFactory( + sparkContext.broadcast(SerializableTableWithSize.copyOf(table)), + broadcastRewritableDeletes(), + command, + context, + writeProperties); + } + + private Broadcast> broadcastRewritableDeletes() { + if (context.deleteGranularity() == DeleteGranularity.FILE && scan != null) { + Map rewritableDeletes = scan.rewritableDeletes(); + if (rewritableDeletes != null && !rewritableDeletes.isEmpty()) { + return sparkContext.broadcast(rewritableDeletes); + } + } + return null; } @Override @@ -185,6 +204,7 @@ public void commit(WriterCommitMessage[] messages) { int addedDataFilesCount = 0; int addedDeleteFilesCount = 0; + int removedDeleteFilesCount = 0; for (WriterCommitMessage message : messages) { DeltaTaskCommit taskCommit = (DeltaTaskCommit) message; @@ -199,6 +219,11 @@ public void commit(WriterCommitMessage[] messages) { addedDeleteFilesCount += 1; } + for (DeleteFile deleteFile : taskCommit.rewrittenDeleteFiles()) { + rowDelta.removeDeletes(deleteFile); + removedDeleteFilesCount += 1; + } + referencedDataFiles.addAll(Arrays.asList(taskCommit.referencedDataFiles())); } @@ -227,10 +252,11 @@ public void commit(WriterCommitMessage[] messages) { String commitMsg = String.format( - "position delta with %d data files and %d delete files " + "position delta with %d data files, %d delete files and %d rewritten delete files" + "(scanSnapshotId: %d, conflictDetectionFilter: %s, isolationLevel: %s)", addedDataFilesCount, addedDeleteFilesCount, + removedDeleteFilesCount, scan.snapshotId(), conflictDetectionFilter, isolationLevel); @@ -314,18 +340,21 @@ private void commitOperation(SnapshotUpdate operation, String description) { public static class DeltaTaskCommit implements WriterCommitMessage { private final DataFile[] dataFiles; private final DeleteFile[] deleteFiles; + private final DeleteFile[] rewrittenDeleteFiles; private final CharSequence[] referencedDataFiles; DeltaTaskCommit(WriteResult result) { this.dataFiles = result.dataFiles(); this.deleteFiles = result.deleteFiles(); this.referencedDataFiles = result.referencedDataFiles(); + this.rewrittenDeleteFiles = result.rewrittenDeleteFiles(); } DeltaTaskCommit(DeleteWriteResult result) { this.dataFiles = new DataFile[0]; this.deleteFiles = result.deleteFiles().toArray(new DeleteFile[0]); this.referencedDataFiles = result.referencedDataFiles().toArray(new CharSequence[0]); + this.rewrittenDeleteFiles = result.rewrittenDeleteFiles().toArray(new DeleteFile[0]); } DataFile[] dataFiles() { @@ -336,6 +365,10 @@ DeleteFile[] deleteFiles() { return deleteFiles; } + DeleteFile[] rewrittenDeleteFiles() { + return rewrittenDeleteFiles; + } + CharSequence[] referencedDataFiles() { return referencedDataFiles; } @@ -343,16 +376,19 @@ CharSequence[] referencedDataFiles() { private static class PositionDeltaWriteFactory implements DeltaWriterFactory { private final Broadcast
tableBroadcast; + private final Broadcast> rewritableDeletesBroadcast; private final Command command; private final Context context; private final Map writeProperties; PositionDeltaWriteFactory( Broadcast
tableBroadcast, + Broadcast> rewritableDeletesBroadcast, Command command, Context context, Map writeProperties) { this.tableBroadcast = tableBroadcast; + this.rewritableDeletesBroadcast = rewritableDeletesBroadcast; this.command = command; this.context = context; this.writeProperties = writeProperties; @@ -385,17 +421,22 @@ public DeltaWriter createWriter(int partitionId, long taskId) { .build(); if (command == DELETE) { - return new DeleteOnlyDeltaWriter(table, writerFactory, deleteFileFactory, context); + return new DeleteOnlyDeltaWriter( + table, rewritableDeletes(), writerFactory, deleteFileFactory, context); } else if (table.spec().isUnpartitioned()) { return new UnpartitionedDeltaWriter( - table, writerFactory, dataFileFactory, deleteFileFactory, context); + table, rewritableDeletes(), writerFactory, dataFileFactory, deleteFileFactory, context); } else { return new PartitionedDeltaWriter( - table, writerFactory, dataFileFactory, deleteFileFactory, context); + table, rewritableDeletes(), writerFactory, dataFileFactory, deleteFileFactory, context); } } + + private Map rewritableDeletes() { + return rewritableDeletesBroadcast != null ? rewritableDeletesBroadcast.getValue() : null; + } } private abstract static class BaseDeltaWriter implements DeltaWriter { @@ -437,23 +478,58 @@ protected PartitioningWriter newDataWriter( // use a fanout writer if the input is unordered no matter whether fanout writers are enabled // clustered writers assume that the position deletes are already ordered by file and position protected PartitioningWriter, DeleteWriteResult> newDeleteWriter( - Table table, SparkFileWriterFactory writers, OutputFileFactory files, Context context) { + Table table, + Map rewritableDeletes, + SparkFileWriterFactory writers, + OutputFileFactory files, + Context context) { FileIO io = table.io(); boolean inputOrdered = context.inputOrdered(); long targetFileSize = context.targetDeleteFileSize(); DeleteGranularity deleteGranularity = context.deleteGranularity(); - if (inputOrdered) { + if (inputOrdered && rewritableDeletes == null) { return new ClusteredPositionDeleteWriter<>( writers, files, io, targetFileSize, deleteGranularity); } else { return new FanoutPositionOnlyDeleteWriter<>( - writers, files, io, targetFileSize, deleteGranularity); + writers, + files, + io, + targetFileSize, + deleteGranularity, + rewritableDeletes != null + ? new PreviousDeleteLoader(table, rewritableDeletes) + : path -> null /* no previous file scoped deletes */); } } } + private static class PreviousDeleteLoader implements Function { + private final Map deleteFiles; + private final DeleteLoader deleteLoader; + + PreviousDeleteLoader(Table table, Map deleteFiles) { + this.deleteFiles = deleteFiles; + this.deleteLoader = + new BaseDeleteLoader( + deleteFile -> + EncryptingFileIO.combine(table.io(), table.encryption()) + .newInputFile(deleteFile)); + } + + @Override + public PositionDeleteIndex apply(CharSequence path) { + DeleteFileSet deleteFileSet = deleteFiles.get(path.toString()); + if (deleteFileSet == null) { + return null; + } + + return deleteLoader.loadPositionDeletes(deleteFileSet, path); + } + } + private static class DeleteOnlyDeltaWriter extends BaseDeltaWriter { private final PartitioningWriter, DeleteWriteResult> delegate; private final PositionDelete positionDelete; @@ -470,11 +546,13 @@ private static class DeleteOnlyDeltaWriter extends BaseDeltaWriter { DeleteOnlyDeltaWriter( Table table, + Map rewritableDeletes, SparkFileWriterFactory writerFactory, OutputFileFactory deleteFileFactory, Context context) { - this.delegate = newDeleteWriter(table, writerFactory, deleteFileFactory, context); + this.delegate = + newDeleteWriter(table, rewritableDeletes, writerFactory, deleteFileFactory, context); this.positionDelete = PositionDelete.create(); this.io = table.io(); this.specs = table.specs(); @@ -557,6 +635,7 @@ private abstract static class DeleteAndDataDeltaWriter extends BaseDeltaWriter { DeleteAndDataDeltaWriter( Table table, + Map rewritableDeletes, SparkFileWriterFactory writerFactory, OutputFileFactory dataFileFactory, OutputFileFactory deleteFileFactory, @@ -564,7 +643,7 @@ private abstract static class DeleteAndDataDeltaWriter extends BaseDeltaWriter { this.delegate = new BasePositionDeltaWriter<>( newDataWriter(table, writerFactory, dataFileFactory, context), - newDeleteWriter(table, writerFactory, deleteFileFactory, context)); + newDeleteWriter(table, rewritableDeletes, writerFactory, deleteFileFactory, context)); this.io = table.io(); this.specs = table.specs(); @@ -629,11 +708,12 @@ private static class UnpartitionedDeltaWriter extends DeleteAndDataDeltaWriter { UnpartitionedDeltaWriter( Table table, + Map rewritableDeletes, SparkFileWriterFactory writerFactory, OutputFileFactory dataFileFactory, OutputFileFactory deleteFileFactory, Context context) { - super(table, writerFactory, dataFileFactory, deleteFileFactory, context); + super(table, rewritableDeletes, writerFactory, dataFileFactory, deleteFileFactory, context); this.dataSpec = table.spec(); } @@ -655,11 +735,12 @@ private static class PartitionedDeltaWriter extends DeleteAndDataDeltaWriter { PartitionedDeltaWriter( Table table, + Map rewritableDeletes, SparkFileWriterFactory writerFactory, OutputFileFactory dataFileFactory, OutputFileFactory deleteFileFactory, Context context) { - super(table, writerFactory, dataFileFactory, deleteFileFactory, context); + super(table, rewritableDeletes, writerFactory, dataFileFactory, deleteFileFactory, context); this.dataSpec = table.spec(); this.dataPartitionKey = new PartitionKey(dataSpec, context.dataSchema());