From d6ec043978110ed814162d53d1c2ffc07449e594 Mon Sep 17 00:00:00 2001 From: Amogh Jahagirdar Date: Wed, 15 Jan 2025 09:16:49 -0700 Subject: [PATCH] Spark 3.4: Backport rewriting historical file-scoped deletes (#11273) to 3.4 --- .../extensions/TestMergeOnReadDelete.java | 83 +++++++++++++ .../extensions/TestMergeOnReadUpdate.java | 99 ++++++++++++++-- .../spark/source/SparkBatchQueryScan.java | 21 ++++ .../spark/source/SparkPositionDeltaWrite.java | 111 +++++++++++++++--- 4 files changed, 289 insertions(+), 25 deletions(-) diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java index 5bec21392eba..bd4c5c88eb96 100644 --- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java +++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java @@ -39,9 +39,11 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkSQLProperties; +import org.apache.iceberg.spark.source.SimpleRecord; import org.apache.iceberg.spark.source.SparkTable; import org.apache.iceberg.spark.source.TestSparkCatalog; import org.apache.iceberg.util.SnapshotUtil; +import org.apache.spark.sql.Encoders; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.apache.spark.sql.connector.catalog.Identifier; import org.junit.Assert; @@ -120,6 +122,87 @@ public void testDeletePartitionGranularity() throws NoSuchTableException { checkDeleteFileGranularity(DeleteGranularity.PARTITION); } + @Test + public void testPositionDeletesAreMaintainedDuringDelete() throws NoSuchTableException { + sql( + "CREATE TABLE %s (id int, data string) USING iceberg PARTITIONED BY (id) TBLPROPERTIES" + + "('%s'='%s', '%s'='%s', '%s'='%s')", + tableName, + TableProperties.FORMAT_VERSION, + 2, + TableProperties.DELETE_MODE, + "merge-on-read", + TableProperties.DELETE_GRANULARITY, + "file"); + createBranchIfNeeded(); + + List records = + Lists.newArrayList( + new SimpleRecord(1, "a"), + new SimpleRecord(1, "b"), + new SimpleRecord(1, "c"), + new SimpleRecord(2, "d"), + new SimpleRecord(2, "e")); + spark + .createDataset(records, Encoders.bean(SimpleRecord.class)) + .coalesce(1) + .writeTo(commitTarget()) + .append(); + + sql("DELETE FROM %s WHERE id = 1 and data='a'", commitTarget()); + sql("DELETE FROM %s WHERE id = 2 and data='d'", commitTarget()); + sql("DELETE FROM %s WHERE id = 1 and data='c'", commitTarget()); + + Table table = validationCatalog.loadTable(tableIdent); + Snapshot latest = SnapshotUtil.latestSnapshot(table, branch); + assertThat(latest.removedDeleteFiles(table.io())).hasSize(1); + assertEquals( + "Should have expected rows", + ImmutableList.of(row(1, "b"), row(2, "e")), + sql("SELECT * FROM %s ORDER BY id ASC", selectTarget())); + } + + @Test + public void testUnpartitionedPositionDeletesAreMaintainedDuringDelete() + throws NoSuchTableException { + sql( + "CREATE TABLE %s (id int, data string) USING iceberg TBLPROPERTIES" + + "('%s'='%s', '%s'='%s', '%s'='%s')", + tableName, + TableProperties.FORMAT_VERSION, + 2, + TableProperties.DELETE_MODE, + "merge-on-read", + TableProperties.DELETE_GRANULARITY, + "file"); + createBranchIfNeeded(); + + List records = + Lists.newArrayList( + new SimpleRecord(1, "a"), + new SimpleRecord(1, "b"), + new SimpleRecord(1, "c"), + new SimpleRecord(2, "d"), + new SimpleRecord(2, "e")); + spark + .createDataset(records, Encoders.bean(SimpleRecord.class)) + .coalesce(1) + .writeTo(commitTarget()) + .append(); + + sql("DELETE FROM %s WHERE id = 1 and data='a'", commitTarget()); + sql("DELETE FROM %s WHERE id = 2 and data='d'", commitTarget()); + sql("DELETE FROM %s WHERE id = 1 and data='c'", commitTarget()); + + Table table = validationCatalog.loadTable(tableIdent); + Snapshot latest = SnapshotUtil.latestSnapshot(table, branch); + assertThat(latest.removedDeleteFiles(table.io())).hasSize(1); + assertEquals( + "Should have expected rows", + ImmutableList.of(row(1, "b"), row(2, "e")), + sql("SELECT * FROM %s ORDER BY id ASC", selectTarget())); + } + private void checkDeleteFileGranularity(DeleteGranularity deleteGranularity) throws NoSuchTableException { createAndInitPartitionedTable(); diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java index 45ef343b2dfe..51bf54a14aab 100644 --- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java +++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java @@ -19,6 +19,7 @@ package org.apache.iceberg.spark.extensions; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assumptions.assumeThat; import java.util.Map; import org.apache.iceberg.PlanningMode; @@ -75,19 +76,82 @@ public void testUpdatePartitionGranularity() { checkUpdateFileGranularity(DeleteGranularity.PARTITION); } - private void checkUpdateFileGranularity(DeleteGranularity deleteGranularity) { - createAndInitTable("id INT, dep STRING", "PARTITIONED BY (dep)", null /* empty */); + @Test + public void testUpdateFileGranularityMergesDeleteFiles() { + // Range distribution will produce partition scoped deletes which will not be cleaned up + assumeThat(distributionMode).isNotEqualToIgnoringCase("range"); - sql( - "ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s')", - tableName, TableProperties.DELETE_GRANULARITY, deleteGranularity); + checkUpdateFileGranularity(DeleteGranularity.FILE); + sql("UPDATE %s SET id = id + 1 WHERE id = 4", commitTarget()); + Table table = validationCatalog.loadTable(tableIdent); + Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch); + String expectedDeleteFilesCount = "2"; + validateMergeOnRead(currentSnapshot, "2", expectedDeleteFilesCount, "2"); - append(tableName, "{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 2, \"dep\": \"hr\" }"); - append(tableName, "{ \"id\": 3, \"dep\": \"hr\" }\n" + "{ \"id\": 4, \"dep\": \"hr\" }"); - append(tableName, "{ \"id\": 1, \"dep\": \"it\" }\n" + "{ \"id\": 2, \"dep\": \"it\" }"); - append(tableName, "{ \"id\": 3, \"dep\": \"it\" }\n" + "{ \"id\": 4, \"dep\": \"it\" }"); + assertThat(currentSnapshot.removedDeleteFiles(table.io())).hasSize(2); + assertEquals( + "Should have expected rows", + ImmutableList.of( + row(0, "hr"), + row(2, "hr"), + row(2, "hr"), + row(5, "hr"), + row(0, "it"), + row(2, "it"), + row(2, "it"), + row(5, "it")), + sql("SELECT * FROM %s ORDER BY dep ASC, id ASC", selectTarget())); + } - createBranchIfNeeded(); + @Test + public void testUpdateUnpartitionedFileGranularityMergesDeleteFiles() { + // Range distribution will produce partition scoped deletes which will not be cleaned up + assumeThat(distributionMode).isNotEqualToIgnoringCase("range"); + initTable("", DeleteGranularity.FILE); + + sql("UPDATE %s SET id = id - 1 WHERE id = 1 OR id = 3", commitTarget()); + + Table table = validationCatalog.loadTable(tableIdent); + assertThat(table.snapshots()).hasSize(5); + Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch); + String expectedDeleteFilesCount = "4"; + validateMergeOnRead(currentSnapshot, "1", expectedDeleteFilesCount, "1"); + assertEquals( + "Should have expected rows", + ImmutableList.of( + row(0, "hr"), + row(2, "hr"), + row(2, "hr"), + row(4, "hr"), + row(0, "it"), + row(2, "it"), + row(2, "it"), + row(4, "it")), + sql("SELECT * FROM %s ORDER BY dep ASC, id ASC", selectTarget())); + + sql("UPDATE %s SET id = id + 1 WHERE id = 4", commitTarget()); + table.refresh(); + currentSnapshot = SnapshotUtil.latestSnapshot(table, branch); + expectedDeleteFilesCount = "2"; + + validateMergeOnRead(currentSnapshot, "1", expectedDeleteFilesCount, "1"); + assertThat(currentSnapshot.removedDeleteFiles(table.io())).hasSize(2); + assertEquals( + "Should have expected rows", + ImmutableList.of( + row(0, "hr"), + row(2, "hr"), + row(2, "hr"), + row(5, "hr"), + row(0, "it"), + row(2, "it"), + row(2, "it"), + row(5, "it")), + sql("SELECT * FROM %s ORDER BY dep ASC, id ASC", selectTarget())); + } + + private void checkUpdateFileGranularity(DeleteGranularity deleteGranularity) { + initTable("PARTITIONED BY (dep)", deleteGranularity); sql("UPDATE %s SET id = id - 1 WHERE id = 1 OR id = 3", commitTarget()); @@ -111,4 +175,19 @@ private void checkUpdateFileGranularity(DeleteGranularity deleteGranularity) { row(4, "it")), sql("SELECT * FROM %s ORDER BY dep ASC, id ASC", selectTarget())); } + + private void initTable(String partitionedBy, DeleteGranularity deleteGranularity) { + createAndInitTable("id INT, dep STRING", partitionedBy, null /* empty */); + + sql( + "ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s')", + tableName, TableProperties.DELETE_GRANULARITY, deleteGranularity); + + append(tableName, "{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 2, \"dep\": \"hr\" }"); + append(tableName, "{ \"id\": 3, \"dep\": \"hr\" }\n" + "{ \"id\": 4, \"dep\": \"hr\" }"); + append(tableName, "{ \"id\": 1, \"dep\": \"it\" }\n" + "{ \"id\": 2, \"dep\": \"it\" }"); + append(tableName, "{ \"id\": 3, \"dep\": \"it\" }\n" + "{ \"id\": 4, \"dep\": \"it\" }"); + + createBranchIfNeeded(); + } } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java index 66cda5b82955..18e483f23fc6 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java @@ -24,6 +24,8 @@ import java.util.Set; import java.util.function.Supplier; import java.util.stream.Collectors; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileScanTask; import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionScanTask; import org.apache.iceberg.PartitionSpec; @@ -48,6 +50,8 @@ import org.apache.iceberg.spark.SparkReadConf; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.SparkV2Filters; +import org.apache.iceberg.util.ContentFileUtil; +import org.apache.iceberg.util.DeleteFileSet; import org.apache.iceberg.util.SnapshotUtil; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.connector.expressions.NamedReference; @@ -158,6 +162,23 @@ public void filter(Predicate[] predicates) { } } + protected Map rewritableDeletes() { + Map rewritableDeletes = Maps.newHashMap(); + + for (ScanTask task : tasks()) { + FileScanTask fileScanTask = task.asFileScanTask(); + for (DeleteFile deleteFile : fileScanTask.deletes()) { + if (ContentFileUtil.isFileScoped(deleteFile)) { + rewritableDeletes + .computeIfAbsent(fileScanTask.file().location(), ignored -> DeleteFileSet.create()) + .add(deleteFile); + } + } + } + + return rewritableDeletes; + } + // at this moment, Spark can only pass IN filters for a single attribute // if there are multiple filter attributes, Spark will pass two separate IN filters private Expression convertRuntimeFilters(Predicate[] predicates) { diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java index 808764b31f44..34dba1f81499 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java @@ -28,6 +28,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.function.Function; import org.apache.iceberg.ContentFile; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; @@ -42,8 +43,12 @@ import org.apache.iceberg.SnapshotSummary; import org.apache.iceberg.SnapshotUpdate; import org.apache.iceberg.Table; +import org.apache.iceberg.data.BaseDeleteLoader; +import org.apache.iceberg.data.DeleteLoader; import org.apache.iceberg.deletes.DeleteGranularity; import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.deletes.PositionDeleteIndex; +import org.apache.iceberg.encryption.EncryptingFileIO; import org.apache.iceberg.exceptions.CleanableFailure; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; @@ -67,6 +72,7 @@ import org.apache.iceberg.spark.SparkWriteRequirements; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.CharSequenceSet; +import org.apache.iceberg.util.DeleteFileSet; import org.apache.iceberg.util.StructProjection; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.broadcast.Broadcast; @@ -155,10 +161,23 @@ private class PositionDeltaBatchWrite implements DeltaBatchWrite { @Override public DeltaWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) { - // broadcast the table metadata as the writer factory will be sent to executors - Broadcast tableBroadcast = - sparkContext.broadcast(SerializableTableWithSize.copyOf(table)); - return new PositionDeltaWriteFactory(tableBroadcast, command, context, writeProperties); + // broadcast large objects since the writer factory will be sent to executors + return new PositionDeltaWriteFactory( + sparkContext.broadcast(SerializableTableWithSize.copyOf(table)), + broadcastRewritableDeletes(), + command, + context, + writeProperties); + } + + private Broadcast> broadcastRewritableDeletes() { + if (context.deleteGranularity() == DeleteGranularity.FILE && scan != null) { + Map rewritableDeletes = scan.rewritableDeletes(); + if (rewritableDeletes != null && !rewritableDeletes.isEmpty()) { + return sparkContext.broadcast(rewritableDeletes); + } + } + return null; } @Override @@ -174,6 +193,7 @@ public void commit(WriterCommitMessage[] messages) { int addedDataFilesCount = 0; int addedDeleteFilesCount = 0; + int removedDeleteFilesCount = 0; for (WriterCommitMessage message : messages) { DeltaTaskCommit taskCommit = (DeltaTaskCommit) message; @@ -183,6 +203,11 @@ public void commit(WriterCommitMessage[] messages) { addedDataFilesCount += 1; } + for (DeleteFile deleteFile : taskCommit.rewrittenDeleteFiles()) { + rowDelta.removeDeletes(deleteFile); + removedDeleteFilesCount += 1; + } + for (DeleteFile deleteFile : taskCommit.deleteFiles()) { rowDelta.addDeletes(deleteFile); addedDeleteFilesCount += 1; @@ -216,10 +241,11 @@ public void commit(WriterCommitMessage[] messages) { String commitMsg = String.format( - "position delta with %d data files and %d delete files " + "position delta with %d data files, %d delete files and %d rewritten delete files" + "(scanSnapshotId: %d, conflictDetectionFilter: %s, isolationLevel: %s)", addedDataFilesCount, addedDeleteFilesCount, + removedDeleteFilesCount, scan.snapshotId(), conflictDetectionFilter, isolationLevel); @@ -303,18 +329,21 @@ private void commitOperation(SnapshotUpdate operation, String description) { public static class DeltaTaskCommit implements WriterCommitMessage { private final DataFile[] dataFiles; private final DeleteFile[] deleteFiles; + private final DeleteFile[] rewrittenDeleteFiles; private final CharSequence[] referencedDataFiles; DeltaTaskCommit(WriteResult result) { this.dataFiles = result.dataFiles(); this.deleteFiles = result.deleteFiles(); this.referencedDataFiles = result.referencedDataFiles(); + this.rewrittenDeleteFiles = result.rewrittenDeleteFiles(); } DeltaTaskCommit(DeleteWriteResult result) { this.dataFiles = new DataFile[0]; this.deleteFiles = result.deleteFiles().toArray(new DeleteFile[0]); this.referencedDataFiles = result.referencedDataFiles().toArray(new CharSequence[0]); + this.rewrittenDeleteFiles = result.rewrittenDeleteFiles().toArray(new DeleteFile[0]); } DataFile[] dataFiles() { @@ -325,6 +354,10 @@ DeleteFile[] deleteFiles() { return deleteFiles; } + DeleteFile[] rewrittenDeleteFiles() { + return rewrittenDeleteFiles; + } + CharSequence[] referencedDataFiles() { return referencedDataFiles; } @@ -332,16 +365,19 @@ CharSequence[] referencedDataFiles() { private static class PositionDeltaWriteFactory implements DeltaWriterFactory { private final Broadcast
tableBroadcast; + private final Broadcast> rewritableDeletesBroadcast; private final Command command; private final Context context; private final Map writeProperties; PositionDeltaWriteFactory( Broadcast
tableBroadcast, + Broadcast> rewritableDeletesBroadcast, Command command, Context context, Map writeProperties) { this.tableBroadcast = tableBroadcast; + this.rewritableDeletesBroadcast = rewritableDeletesBroadcast; this.command = command; this.context = context; this.writeProperties = writeProperties; @@ -374,17 +410,22 @@ public DeltaWriter createWriter(int partitionId, long taskId) { .build(); if (command == DELETE) { - return new DeleteOnlyDeltaWriter(table, writerFactory, deleteFileFactory, context); + return new DeleteOnlyDeltaWriter( + table, rewritableDeletes(), writerFactory, deleteFileFactory, context); } else if (table.spec().isUnpartitioned()) { return new UnpartitionedDeltaWriter( - table, writerFactory, dataFileFactory, deleteFileFactory, context); + table, rewritableDeletes(), writerFactory, dataFileFactory, deleteFileFactory, context); } else { return new PartitionedDeltaWriter( - table, writerFactory, dataFileFactory, deleteFileFactory, context); + table, rewritableDeletes(), writerFactory, dataFileFactory, deleteFileFactory, context); } } + + private Map rewritableDeletes() { + return rewritableDeletesBroadcast != null ? rewritableDeletesBroadcast.getValue() : null; + } } private abstract static class BaseDeltaWriter implements DeltaWriter { @@ -427,23 +468,58 @@ protected PartitioningWriter newDataWriter( // use a fanout writer if the input is unordered no matter whether fanout writers are enabled // clustered writers assume that the position deletes are already ordered by file and position protected PartitioningWriter, DeleteWriteResult> newDeleteWriter( - Table table, SparkFileWriterFactory writers, OutputFileFactory files, Context context) { + Table table, + Map rewritableDeletes, + SparkFileWriterFactory writers, + OutputFileFactory files, + Context context) { FileIO io = table.io(); boolean inputOrdered = context.inputOrdered(); long targetFileSize = context.targetDeleteFileSize(); DeleteGranularity deleteGranularity = context.deleteGranularity(); - if (inputOrdered) { + if (inputOrdered && rewritableDeletes == null) { return new ClusteredPositionDeleteWriter<>( writers, files, io, targetFileSize, deleteGranularity); } else { return new FanoutPositionOnlyDeleteWriter<>( - writers, files, io, targetFileSize, deleteGranularity); + writers, + files, + io, + targetFileSize, + deleteGranularity, + rewritableDeletes != null + ? new PreviousDeleteLoader(table, rewritableDeletes) + : path -> null /* no previous file scoped deletes */); } } } + private static class PreviousDeleteLoader implements Function { + private final Map deleteFiles; + private final DeleteLoader deleteLoader; + + PreviousDeleteLoader(Table table, Map deleteFiles) { + this.deleteFiles = deleteFiles; + this.deleteLoader = + new BaseDeleteLoader( + deleteFile -> + EncryptingFileIO.combine(table.io(), table.encryption()) + .newInputFile(deleteFile)); + } + + @Override + public PositionDeleteIndex apply(CharSequence path) { + DeleteFileSet deleteFileSet = deleteFiles.get(path.toString()); + if (deleteFileSet == null) { + return null; + } + + return deleteLoader.loadPositionDeletes(deleteFileSet, path); + } + } + private static class DeleteOnlyDeltaWriter extends BaseDeltaWriter { private final PartitioningWriter, DeleteWriteResult> delegate; private final PositionDelete positionDelete; @@ -460,11 +536,13 @@ private static class DeleteOnlyDeltaWriter extends BaseDeltaWriter { DeleteOnlyDeltaWriter( Table table, + Map rewritableDeletes, SparkFileWriterFactory writerFactory, OutputFileFactory deleteFileFactory, Context context) { - this.delegate = newDeleteWriter(table, writerFactory, deleteFileFactory, context); + this.delegate = + newDeleteWriter(table, rewritableDeletes, writerFactory, deleteFileFactory, context); this.positionDelete = PositionDelete.create(); this.io = table.io(); this.specs = table.specs(); @@ -547,6 +625,7 @@ private abstract static class DeleteAndDataDeltaWriter extends BaseDeltaWriter { DeleteAndDataDeltaWriter( Table table, + Map rewritableDeletes, SparkFileWriterFactory writerFactory, OutputFileFactory dataFileFactory, OutputFileFactory deleteFileFactory, @@ -554,7 +633,7 @@ private abstract static class DeleteAndDataDeltaWriter extends BaseDeltaWriter { this.delegate = new BasePositionDeltaWriter<>( newDataWriter(table, writerFactory, dataFileFactory, context), - newDeleteWriter(table, writerFactory, deleteFileFactory, context)); + newDeleteWriter(table, rewritableDeletes, writerFactory, deleteFileFactory, context)); this.io = table.io(); this.specs = table.specs(); @@ -619,11 +698,12 @@ private static class UnpartitionedDeltaWriter extends DeleteAndDataDeltaWriter { UnpartitionedDeltaWriter( Table table, + Map rewritableDeletes, SparkFileWriterFactory writerFactory, OutputFileFactory dataFileFactory, OutputFileFactory deleteFileFactory, Context context) { - super(table, writerFactory, dataFileFactory, deleteFileFactory, context); + super(table, rewritableDeletes, writerFactory, dataFileFactory, deleteFileFactory, context); this.dataSpec = table.spec(); } @@ -645,11 +725,12 @@ private static class PartitionedDeltaWriter extends DeleteAndDataDeltaWriter { PartitionedDeltaWriter( Table table, + Map rewritableDeletes, SparkFileWriterFactory writerFactory, OutputFileFactory dataFileFactory, OutputFileFactory deleteFileFactory, Context context) { - super(table, writerFactory, dataFileFactory, deleteFileFactory, context); + super(table, rewritableDeletes, writerFactory, dataFileFactory, deleteFileFactory, context); this.dataSpec = table.spec(); this.dataPartitionKey = new PartitionKey(dataSpec, context.dataSchema());