From 19c1779f845e89135677e599ed852589ed47800d Mon Sep 17 00:00:00 2001 From: Anton Okolnychyi Date: Wed, 25 Sep 2024 16:59:29 -0700 Subject: [PATCH 1/2] Core: Support combining position deletes during writes --- .../deletes/BitmapPositionDeleteIndex.java | 25 +++- .../org/apache/iceberg/deletes/Deletes.java | 32 ++++- .../iceberg/deletes/PositionDelete.java | 7 ++ .../iceberg/deletes/PositionDeleteIndex.java | 15 +++ .../SortingPositionOnlyDeleteWriter.java | 62 ++++++++-- .../io/FanoutPositionOnlyDeleteWriter.java | 24 +++- .../java/org/apache/iceberg/TestBase.java | 4 + .../apache/iceberg/data/BaseDeleteLoader.java | 4 +- .../iceberg/io/TestPartitioningWriters.java | 116 ++++++++++++++++++ 9 files changed, 268 insertions(+), 21 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java b/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java index 3a044878562a..a1b57a38666d 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java +++ b/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java @@ -18,18 +18,35 @@ */ package org.apache.iceberg.deletes; +import java.util.Collection; +import java.util.List; import java.util.function.LongConsumer; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.roaringbitmap.longlong.Roaring64Bitmap; class BitmapPositionDeleteIndex implements PositionDeleteIndex { private final Roaring64Bitmap roaring64Bitmap; + private final List deleteFiles; BitmapPositionDeleteIndex() { - roaring64Bitmap = new Roaring64Bitmap(); + this.roaring64Bitmap = new Roaring64Bitmap(); + this.deleteFiles = Lists.newArrayList(); + } + + BitmapPositionDeleteIndex(Collection deleteFiles) { + this.roaring64Bitmap = new Roaring64Bitmap(); + this.deleteFiles = Lists.newArrayList(deleteFiles); + } + + BitmapPositionDeleteIndex(DeleteFile deleteFile) { + this.roaring64Bitmap = new Roaring64Bitmap(); + this.deleteFiles = deleteFile != null ? Lists.newArrayList(deleteFile) : Lists.newArrayList(); } void merge(BitmapPositionDeleteIndex that) { roaring64Bitmap.or(that.roaring64Bitmap); + deleteFiles.addAll(that.deleteFiles); } @Override @@ -48,6 +65,7 @@ public void merge(PositionDeleteIndex that) { merge((BitmapPositionDeleteIndex) that); } else { that.forEach(this::delete); + deleteFiles.addAll(that.deleteFiles()); } } @@ -65,4 +83,9 @@ public boolean isEmpty() { public void forEach(LongConsumer consumer) { roaring64Bitmap.forEach(consumer::accept); } + + @Override + public Collection deleteFiles() { + return deleteFiles; + } } diff --git a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java index a72e01613040..2256b378f62a 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java +++ b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java @@ -26,6 +26,7 @@ import java.util.function.Function; import java.util.function.Predicate; import org.apache.iceberg.Accessor; +import org.apache.iceberg.DeleteFile; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; @@ -123,6 +124,11 @@ public static StructLikeSet toEqualitySet( } } + public static CharSequenceMap toPositionIndexes( + CloseableIterable posDeletes) { + return toPositionIndexes(posDeletes, null /* unknown delete file */); + } + /** * Builds a map of position delete indexes by path. * @@ -131,10 +137,11 @@ public static StructLikeSet toEqualitySet( * entire delete file content is needed (e.g. caching). * * @param posDeletes position deletes + * @param file the source delete file for the deletes * @return the map of position delete indexes by path */ public static CharSequenceMap toPositionIndexes( - CloseableIterable posDeletes) { + CloseableIterable posDeletes, DeleteFile file) { CharSequenceMap indexes = CharSequenceMap.create(); try (CloseableIterable deletes = posDeletes) { @@ -142,7 +149,7 @@ public static CharSequenceMap toPosi CharSequence filePath = (CharSequence) FILENAME_ACCESSOR.get(delete); long position = (long) POSITION_ACCESSOR.get(delete); PositionDeleteIndex index = - indexes.computeIfAbsent(filePath, key -> new BitmapPositionDeleteIndex()); + indexes.computeIfAbsent(filePath, key -> new BitmapPositionDeleteIndex(file)); index.delete(position); } } catch (IOException e) { @@ -152,6 +159,20 @@ public static CharSequenceMap toPosi return indexes; } + public static PositionDeleteIndex toPositionIndex( + CharSequence dataLocation, CloseableIterable posDeletes, DeleteFile file) { + CloseableIterable positions = extractPositions(dataLocation, posDeletes); + List files = ImmutableList.of(file); + return toPositionIndex(positions, files); + } + + private static CloseableIterable extractPositions( + CharSequence dataLocation, CloseableIterable rows) { + DataFileFilter filter = new DataFileFilter<>(dataLocation); + CloseableIterable filteredRows = filter.filter(rows); + return CloseableIterable.transform(filteredRows, row -> (Long) POSITION_ACCESSOR.get(row)); + } + public static PositionDeleteIndex toPositionIndex( CharSequence dataLocation, List> deleteFiles) { return toPositionIndex(dataLocation, deleteFiles, ThreadPools.getDeleteWorkerPool()); @@ -176,8 +197,13 @@ public static PositionDeleteIndex toPositionIndex( } public static PositionDeleteIndex toPositionIndex(CloseableIterable posDeletes) { + return toPositionIndex(posDeletes, ImmutableList.of()); + } + + private static PositionDeleteIndex toPositionIndex( + CloseableIterable posDeletes, List files) { try (CloseableIterable deletes = posDeletes) { - PositionDeleteIndex positionDeleteIndex = new BitmapPositionDeleteIndex(); + PositionDeleteIndex positionDeleteIndex = new BitmapPositionDeleteIndex(files); deletes.forEach(positionDeleteIndex::delete); return positionDeleteIndex; } catch (IOException e) { diff --git a/core/src/main/java/org/apache/iceberg/deletes/PositionDelete.java b/core/src/main/java/org/apache/iceberg/deletes/PositionDelete.java index 655428ce7713..57e188567f68 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/PositionDelete.java +++ b/core/src/main/java/org/apache/iceberg/deletes/PositionDelete.java @@ -31,6 +31,13 @@ public static PositionDelete create() { private PositionDelete() {} + public PositionDelete set(CharSequence newPath, long newPos) { + this.path = newPath; + this.pos = newPos; + this.row = null; + return this; + } + public PositionDelete set(CharSequence newPath, long newPos, R newRow) { this.path = newPath; this.pos = newPos; diff --git a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndex.java b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndex.java index c0086fe6aa2e..3655b8b7e8eb 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndex.java +++ b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndex.java @@ -18,7 +18,10 @@ */ package org.apache.iceberg.deletes; +import java.util.Collection; import java.util.function.LongConsumer; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; public interface PositionDeleteIndex { /** @@ -42,6 +45,9 @@ public interface PositionDeleteIndex { * @param that the other index to merge */ default void merge(PositionDeleteIndex that) { + if (!that.deleteFiles().isEmpty()) { + throw new UnsupportedOperationException(getClass().getName() + " does not support merge"); + } that.forEach(this::delete); } @@ -72,6 +78,15 @@ default void forEach(LongConsumer consumer) { } } + /** + * Returns delete files that this index was created from or an empty collection if unknown. + * + * @return delete files that this index was created from + */ + default Collection deleteFiles() { + return ImmutableList.of(); + } + /** Returns an empty immutable position delete index. */ static PositionDeleteIndex empty() { return EmptyPositionDeleteIndex.get(); diff --git a/core/src/main/java/org/apache/iceberg/deletes/SortingPositionOnlyDeleteWriter.java b/core/src/main/java/org/apache/iceberg/deletes/SortingPositionOnlyDeleteWriter.java index 1d4d131dfe6f..16b0fdf469ae 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/SortingPositionOnlyDeleteWriter.java +++ b/core/src/main/java/org/apache/iceberg/deletes/SortingPositionOnlyDeleteWriter.java @@ -21,17 +21,18 @@ import java.io.IOException; import java.util.Collection; import java.util.List; +import java.util.function.Function; import java.util.function.Supplier; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.io.DeleteWriteResult; import org.apache.iceberg.io.FileWriter; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Comparators; import org.apache.iceberg.util.CharSequenceMap; import org.apache.iceberg.util.CharSequenceSet; -import org.roaringbitmap.longlong.PeekableLongIterator; -import org.roaringbitmap.longlong.Roaring64Bitmap; +import org.apache.iceberg.util.ContentFileUtil; /** * A position delete writer that is capable of handling unordered deletes without rows. @@ -41,6 +42,11 @@ * records are not ordered by file and position as required by the spec. If the incoming deletes are * ordered by an external process, use {@link PositionDeleteWriter} instead. * + *

If configured, this writer can also load previous deletes using the provided function and + * merge them with incoming ones prior to flushing the deletes into a file. Callers must ensure only + * previous file-scoped deletes are loaded because partition-scoped deletes can apply to multiple + * data files and can't be safely discarded. + * *

Note this writer stores only positions. It does not store deleted records. */ public class SortingPositionOnlyDeleteWriter @@ -48,7 +54,8 @@ public class SortingPositionOnlyDeleteWriter private final Supplier, DeleteWriteResult>> writers; private final DeleteGranularity granularity; - private final CharSequenceMap positionsByPath; + private final CharSequenceMap positionsByPath; + private final Function loadPreviousDeletes; private DeleteWriteResult result = null; public SortingPositionOnlyDeleteWriter(FileWriter, DeleteWriteResult> writer) { @@ -58,17 +65,26 @@ public SortingPositionOnlyDeleteWriter(FileWriter, DeleteWrite public SortingPositionOnlyDeleteWriter( Supplier, DeleteWriteResult>> writers, DeleteGranularity granularity) { + this(writers, granularity, null /* no access to previous deletes */); + } + + public SortingPositionOnlyDeleteWriter( + Supplier, DeleteWriteResult>> writers, + DeleteGranularity granularity, + Function loadPreviousDeletes) { this.writers = writers; this.granularity = granularity; this.positionsByPath = CharSequenceMap.create(); + this.loadPreviousDeletes = loadPreviousDeletes; } @Override public void write(PositionDelete positionDelete) { CharSequence path = positionDelete.path(); long position = positionDelete.pos(); - Roaring64Bitmap positions = positionsByPath.computeIfAbsent(path, Roaring64Bitmap::new); - positions.add(position); + PositionDeleteIndex positions = + positionsByPath.computeIfAbsent(path, key -> new BitmapPositionDeleteIndex()); + positions.delete(position); } @Override @@ -106,14 +122,16 @@ private DeleteWriteResult writePartitionDeletes() throws IOException { private DeleteWriteResult writeFileDeletes() throws IOException { List deleteFiles = Lists.newArrayList(); CharSequenceSet referencedDataFiles = CharSequenceSet.empty(); + List rewrittenDeleteFiles = Lists.newArrayList(); for (CharSequence path : positionsByPath.keySet()) { DeleteWriteResult writeResult = writeDeletes(ImmutableList.of(path)); deleteFiles.addAll(writeResult.deleteFiles()); referencedDataFiles.addAll(writeResult.referencedDataFiles()); + rewrittenDeleteFiles.addAll(writeResult.rewrittenDeleteFiles()); } - return new DeleteWriteResult(deleteFiles, referencedDataFiles); + return new DeleteWriteResult(deleteFiles, referencedDataFiles, rewrittenDeleteFiles); } @SuppressWarnings("CollectionUndefinedEquality") @@ -123,22 +141,42 @@ private DeleteWriteResult writeDeletes(Collection paths) throws IO } FileWriter, DeleteWriteResult> writer = writers.get(); + List rewrittenDeleteFile = Lists.newArrayList(); try { PositionDelete positionDelete = PositionDelete.create(); for (CharSequence path : sort(paths)) { - // the iterator provides values in ascending sorted order - PeekableLongIterator positions = positionsByPath.get(path).getLongIterator(); - while (positions.hasNext()) { - long position = positions.next(); - writer.write(positionDelete.set(path, position, null /* no row */)); + PositionDeleteIndex positions = positionsByPath.get(path); + PositionDeleteIndex previousPositions = loadPreviousDeletes(path); + if (previousPositions != null && previousPositions.isNotEmpty()) { + validatePreviousDeletes(previousPositions); + positions.merge(previousPositions); + rewrittenDeleteFile.addAll(previousPositions.deleteFiles()); } + positions.forEach(position -> writer.write(positionDelete.set(path, position))); } } finally { writer.close(); } - return writer.result(); + DeleteWriteResult writerResult = writer.result(); + List deleteFiles = writerResult.deleteFiles(); + CharSequenceSet referencedDataFiles = writerResult.referencedDataFiles(); + return new DeleteWriteResult(deleteFiles, referencedDataFiles, rewrittenDeleteFile); + } + + private void validatePreviousDeletes(PositionDeleteIndex index) { + Preconditions.checkArgument( + index.deleteFiles().stream().allMatch(this::isFileScoped), + "Previous deletes must be file-scoped"); + } + + private boolean isFileScoped(DeleteFile deleteFile) { + return ContentFileUtil.referencedDataFile(deleteFile) != null; + } + + private PositionDeleteIndex loadPreviousDeletes(CharSequence path) { + return loadPreviousDeletes != null ? loadPreviousDeletes.apply(path) : null; } private Collection sort(Collection paths) { diff --git a/core/src/main/java/org/apache/iceberg/io/FanoutPositionOnlyDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/FanoutPositionOnlyDeleteWriter.java index c6a55064b756..ff1fb917b172 100644 --- a/core/src/main/java/org/apache/iceberg/io/FanoutPositionOnlyDeleteWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/FanoutPositionOnlyDeleteWriter.java @@ -19,11 +19,13 @@ package org.apache.iceberg.io; import java.util.List; +import java.util.function.Function; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.StructLike; import org.apache.iceberg.deletes.DeleteGranularity; import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.deletes.PositionDeleteIndex; import org.apache.iceberg.deletes.SortingPositionOnlyDeleteWriter; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.util.CharSequenceSet; @@ -45,13 +47,15 @@ public class FanoutPositionOnlyDeleteWriter private final DeleteGranularity granularity; private final List deleteFiles; private final CharSequenceSet referencedDataFiles; + private final List rewrittenDeleteFiles; + private final Function loadPreviousDeletes; public FanoutPositionOnlyDeleteWriter( FileWriterFactory writerFactory, OutputFileFactory fileFactory, FileIO io, long targetFileSizeInBytes) { - this(writerFactory, fileFactory, io, targetFileSizeInBytes, DeleteGranularity.PARTITION); + this(writerFactory, fileFactory, io, targetFileSizeInBytes, DeleteGranularity.PARTITION, null); } public FanoutPositionOnlyDeleteWriter( @@ -60,6 +64,16 @@ public FanoutPositionOnlyDeleteWriter( FileIO io, long targetFileSizeInBytes, DeleteGranularity granularity) { + this(writerFactory, fileFactory, io, targetFileSizeInBytes, granularity, null); + } + + public FanoutPositionOnlyDeleteWriter( + FileWriterFactory writerFactory, + OutputFileFactory fileFactory, + FileIO io, + long targetFileSizeInBytes, + DeleteGranularity granularity, + Function loadPreviousDeletes) { this.writerFactory = writerFactory; this.fileFactory = fileFactory; this.io = io; @@ -67,6 +81,8 @@ public FanoutPositionOnlyDeleteWriter( this.granularity = granularity; this.deleteFiles = Lists.newArrayList(); this.referencedDataFiles = CharSequenceSet.empty(); + this.rewrittenDeleteFiles = Lists.newArrayList(); + this.loadPreviousDeletes = loadPreviousDeletes; } @Override @@ -76,17 +92,19 @@ protected FileWriter, DeleteWriteResult> newWriter( () -> new RollingPositionDeleteWriter<>( writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition), - granularity); + granularity, + loadPreviousDeletes); } @Override protected void addResult(DeleteWriteResult result) { deleteFiles.addAll(result.deleteFiles()); referencedDataFiles.addAll(result.referencedDataFiles()); + rewrittenDeleteFiles.addAll(result.rewrittenDeleteFiles()); } @Override protected DeleteWriteResult aggregatedResult() { - return new DeleteWriteResult(deleteFiles, referencedDataFiles); + return new DeleteWriteResult(deleteFiles, referencedDataFiles, rewrittenDeleteFiles); } } diff --git a/core/src/test/java/org/apache/iceberg/TestBase.java b/core/src/test/java/org/apache/iceberg/TestBase.java index a0b52b346bf3..f3bbb7979547 100644 --- a/core/src/test/java/org/apache/iceberg/TestBase.java +++ b/core/src/test/java/org/apache/iceberg/TestBase.java @@ -665,6 +665,10 @@ protected DeleteFile newEqualityDeleteFile(int specId, String partitionPath, int .build(); } + protected PositionDelete positionDelete(CharSequence path, long pos) { + return positionDelete(path, pos, null /* no row */); + } + protected PositionDelete positionDelete(CharSequence path, long pos, T row) { PositionDelete positionDelete = PositionDelete.create(); return positionDelete.set(path, pos, row); diff --git a/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java b/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java index 91b7fd1c1dc1..8a1ebf95abeb 100644 --- a/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java +++ b/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java @@ -166,13 +166,13 @@ private PositionDeleteIndex getOrReadPosDeletes(DeleteFile deleteFile, CharSeque private CharSequenceMap readPosDeletes(DeleteFile deleteFile) { CloseableIterable deletes = openDeletes(deleteFile, POS_DELETE_SCHEMA); - return Deletes.toPositionIndexes(deletes); + return Deletes.toPositionIndexes(deletes, deleteFile); } private PositionDeleteIndex readPosDeletes(DeleteFile deleteFile, CharSequence filePath) { Expression filter = Expressions.equal(MetadataColumns.DELETE_FILE_PATH.name(), filePath); CloseableIterable deletes = openDeletes(deleteFile, POS_DELETE_SCHEMA, filter); - return Deletes.toPositionIndex(filePath, ImmutableList.of(deletes)); + return Deletes.toPositionIndex(filePath, deletes, deleteFile); } private CloseableIterable openDeletes(DeleteFile deleteFile, Schema projection) { diff --git a/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java b/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java index 8dc031314eda..3e36099aa8e3 100644 --- a/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java +++ b/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java @@ -20,13 +20,17 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assumptions.assumeThat; import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.util.Arrays; import java.util.List; +import java.util.Map; +import java.util.function.Function; import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.Parameter; import org.apache.iceberg.ParameterizedTestExtension; @@ -34,10 +38,16 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.RowDelta; import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.data.BaseDeleteLoader; +import org.apache.iceberg.data.DeleteLoader; import org.apache.iceberg.deletes.DeleteGranularity; import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.deletes.PositionDeleteIndex; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.ContentFileUtil; import org.apache.iceberg.util.StructLikeSet; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.TestTemplate; @@ -719,4 +729,110 @@ private void checkFanoutPositionOnlyDeleteWriterGranularity(DeleteGranularity de List expectedRows = ImmutableList.of(toRow(11, "aaa"), toRow(12, "aaa")); assertThat(actualRowSet("*")).isEqualTo(toSet(expectedRows)); } + + @TestTemplate + public void testRewriteOfPreviousDeletes() throws IOException { + assumeThat(format()).isIn(FileFormat.PARQUET, FileFormat.ORC); + + FileWriterFactory writerFactory = newWriterFactory(table.schema()); + + // add the first data file + List rows1 = ImmutableList.of(toRow(1, "aaa"), toRow(2, "aaa"), toRow(11, "aaa")); + DataFile dataFile1 = writeData(writerFactory, fileFactory, rows1, table.spec(), null); + table.newFastAppend().appendFile(dataFile1).commit(); + + // add the second data file + List rows2 = ImmutableList.of(toRow(3, "aaa"), toRow(4, "aaa"), toRow(12, "aaa")); + DataFile dataFile2 = writeData(writerFactory, fileFactory, rows2, table.spec(), null); + table.newFastAppend().appendFile(dataFile2).commit(); + + PartitionSpec spec = table.spec(); + + // init the first delete writer without access to previous deletes + FanoutPositionOnlyDeleteWriter writer1 = + new FanoutPositionOnlyDeleteWriter<>( + writerFactory, + fileFactory, + table.io(), + TARGET_FILE_SIZE, + DeleteGranularity.FILE, + null /* no previous deletes */); + + // write initial deletes for both data files + writer1.write(positionDelete(dataFile1.path(), 1L), spec, null); + writer1.write(positionDelete(dataFile2.path(), 1L), spec, null); + writer1.close(); + + // verify the writer result + DeleteWriteResult result1 = writer1.result(); + assertThat(result1.deleteFiles()).hasSize(2); + assertThat(result1.referencedDataFiles()).hasSize(2); + assertThat(result1.referencesDataFiles()).isTrue(); + assertThat(result1.rewrittenDeleteFiles()).isEmpty(); + + // commit the initial deletes + RowDelta rowDelta1 = table.newRowDelta(); + result1.deleteFiles().forEach(rowDelta1::addDeletes); + rowDelta1.commit(); + + // verify correctness of the first delete operation + List expectedRows1 = + ImmutableList.of(toRow(1, "aaa"), toRow(3, "aaa"), toRow(11, "aaa"), toRow(12, "aaa")); + assertThat(actualRowSet("*")).isEqualTo(toSet(expectedRows1)); + + // populate previous delete mapping + Map previousDeletes = Maps.newHashMap(); + for (DeleteFile deleteFile : result1.deleteFiles()) { + String dataLocation = ContentFileUtil.referencedDataFile(deleteFile).toString(); + previousDeletes.put(dataLocation, deleteFile); + } + + // init the second delete writer with access to previous deletes + FanoutPositionOnlyDeleteWriter writer2 = + new FanoutPositionOnlyDeleteWriter<>( + writerFactory, + fileFactory, + table.io(), + TARGET_FILE_SIZE, + DeleteGranularity.FILE, + new PreviousDeleteLoader(table, previousDeletes)); + + // write more deletes for both data files + writer2.write(positionDelete(dataFile1.path(), 0L), spec, null); + writer2.write(positionDelete(dataFile2.path(), 0L), spec, null); + writer2.close(); + + // verify the writer result + DeleteWriteResult result2 = writer2.result(); + assertThat(result2.deleteFiles()).hasSize(2); + assertThat(result2.referencedDataFiles()).hasSize(2); + assertThat(result2.referencesDataFiles()).isTrue(); + assertThat(result2.rewrittenDeleteFiles()).hasSize(2); + + // add new and remove rewritten delete files + RowDelta rowDelta2 = table.newRowDelta(); + result2.deleteFiles().forEach(rowDelta2::addDeletes); + result2.rewrittenDeleteFiles().forEach(rowDelta2::removeDeletes); + rowDelta2.commit(); + + // verify correctness of the second delete operation + List expectedRows2 = ImmutableList.of(toRow(11, "aaa"), toRow(12, "aaa")); + assertThat(actualRowSet("*")).isEqualTo(toSet(expectedRows2)); + } + + private static class PreviousDeleteLoader implements Function { + private final Map deleteFiles; + private final DeleteLoader deleteLoader; + + PreviousDeleteLoader(Table table, Map deleteFiles) { + this.deleteFiles = deleteFiles; + this.deleteLoader = new BaseDeleteLoader(deleteFile -> table.io().newInputFile(deleteFile)); + } + + @Override + public PositionDeleteIndex apply(CharSequence path) { + DeleteFile deleteFile = deleteFiles.get(path); + return deleteLoader.loadPositionDeletes(ImmutableList.of(deleteFile), path); + } + } } From 49719900b2516c5b2ebaf149e09a58f3a8da06ee Mon Sep 17 00:00:00 2001 From: Anton Okolnychyi Date: Mon, 30 Sep 2024 16:32:29 -0700 Subject: [PATCH 2/2] Review feedback --- .../deletes/SortingPositionOnlyDeleteWriter.java | 14 +++++--------- .../iceberg/io/FanoutPositionOnlyDeleteWriter.java | 10 ++++++++-- .../apache/iceberg/io/TestPartitioningWriters.java | 7 +------ 3 files changed, 14 insertions(+), 17 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/deletes/SortingPositionOnlyDeleteWriter.java b/core/src/main/java/org/apache/iceberg/deletes/SortingPositionOnlyDeleteWriter.java index 16b0fdf469ae..818529c02479 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/SortingPositionOnlyDeleteWriter.java +++ b/core/src/main/java/org/apache/iceberg/deletes/SortingPositionOnlyDeleteWriter.java @@ -65,7 +65,7 @@ public SortingPositionOnlyDeleteWriter(FileWriter, DeleteWrite public SortingPositionOnlyDeleteWriter( Supplier, DeleteWriteResult>> writers, DeleteGranularity granularity) { - this(writers, granularity, null /* no access to previous deletes */); + this(writers, granularity, path -> null /* no access to previous deletes */); } public SortingPositionOnlyDeleteWriter( @@ -141,17 +141,17 @@ private DeleteWriteResult writeDeletes(Collection paths) throws IO } FileWriter, DeleteWriteResult> writer = writers.get(); - List rewrittenDeleteFile = Lists.newArrayList(); + List rewrittenDeleteFiles = Lists.newArrayList(); try { PositionDelete positionDelete = PositionDelete.create(); for (CharSequence path : sort(paths)) { PositionDeleteIndex positions = positionsByPath.get(path); - PositionDeleteIndex previousPositions = loadPreviousDeletes(path); + PositionDeleteIndex previousPositions = loadPreviousDeletes.apply(path); if (previousPositions != null && previousPositions.isNotEmpty()) { validatePreviousDeletes(previousPositions); positions.merge(previousPositions); - rewrittenDeleteFile.addAll(previousPositions.deleteFiles()); + rewrittenDeleteFiles.addAll(previousPositions.deleteFiles()); } positions.forEach(position -> writer.write(positionDelete.set(path, position))); } @@ -162,7 +162,7 @@ private DeleteWriteResult writeDeletes(Collection paths) throws IO DeleteWriteResult writerResult = writer.result(); List deleteFiles = writerResult.deleteFiles(); CharSequenceSet referencedDataFiles = writerResult.referencedDataFiles(); - return new DeleteWriteResult(deleteFiles, referencedDataFiles, rewrittenDeleteFile); + return new DeleteWriteResult(deleteFiles, referencedDataFiles, rewrittenDeleteFiles); } private void validatePreviousDeletes(PositionDeleteIndex index) { @@ -175,10 +175,6 @@ private boolean isFileScoped(DeleteFile deleteFile) { return ContentFileUtil.referencedDataFile(deleteFile) != null; } - private PositionDeleteIndex loadPreviousDeletes(CharSequence path) { - return loadPreviousDeletes != null ? loadPreviousDeletes.apply(path) : null; - } - private Collection sort(Collection paths) { if (paths.size() <= 1) { return paths; diff --git a/core/src/main/java/org/apache/iceberg/io/FanoutPositionOnlyDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/FanoutPositionOnlyDeleteWriter.java index ff1fb917b172..9c527f4b32e5 100644 --- a/core/src/main/java/org/apache/iceberg/io/FanoutPositionOnlyDeleteWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/FanoutPositionOnlyDeleteWriter.java @@ -55,7 +55,7 @@ public FanoutPositionOnlyDeleteWriter( OutputFileFactory fileFactory, FileIO io, long targetFileSizeInBytes) { - this(writerFactory, fileFactory, io, targetFileSizeInBytes, DeleteGranularity.PARTITION, null); + this(writerFactory, fileFactory, io, targetFileSizeInBytes, DeleteGranularity.PARTITION); } public FanoutPositionOnlyDeleteWriter( @@ -64,7 +64,13 @@ public FanoutPositionOnlyDeleteWriter( FileIO io, long targetFileSizeInBytes, DeleteGranularity granularity) { - this(writerFactory, fileFactory, io, targetFileSizeInBytes, granularity, null); + this( + writerFactory, + fileFactory, + io, + targetFileSizeInBytes, + granularity, + path -> null /* no access to previous deletes */); } public FanoutPositionOnlyDeleteWriter( diff --git a/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java b/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java index 3e36099aa8e3..1c8453bd6a75 100644 --- a/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java +++ b/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java @@ -751,12 +751,7 @@ public void testRewriteOfPreviousDeletes() throws IOException { // init the first delete writer without access to previous deletes FanoutPositionOnlyDeleteWriter writer1 = new FanoutPositionOnlyDeleteWriter<>( - writerFactory, - fileFactory, - table.io(), - TARGET_FILE_SIZE, - DeleteGranularity.FILE, - null /* no previous deletes */); + writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE, DeleteGranularity.FILE); // write initial deletes for both data files writer1.write(positionDelete(dataFile1.path(), 1L), spec, null);