diff --git a/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java b/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java index e7675c357bf1..a41f1a7d643b 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java +++ b/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java @@ -39,7 +39,12 @@ public void delete(long posStart, long posEnd) { } @Override - public boolean deleted(long position) { + public boolean isDeleted(long position) { return roaring64Bitmap.contains(position); } + + @Override + public boolean isEmpty() { + return roaring64Bitmap.isEmpty(); + } } diff --git a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java index 6bb51c7fdead..af5d95339383 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java +++ b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.List; -import java.util.Set; import java.util.function.Function; import org.apache.iceberg.Accessor; import org.apache.iceberg.MetadataColumns; @@ -35,7 +34,6 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.Filter; import org.apache.iceberg.util.SortedMerge; @@ -66,7 +64,7 @@ public static CloseableIterable filter(CloseableIterable rows, Functio } public static CloseableIterable filter(CloseableIterable rows, Function rowToPosition, - Set deleteSet) { + PositionDeleteIndex deleteSet) { if (deleteSet.isEmpty()) { return rows; } @@ -85,35 +83,15 @@ public static StructLikeSet toEqualitySet(CloseableIterable eqDelete } } - public static Set toPositionSet(CharSequence dataLocation, CloseableIterable deleteFile) { - return toPositionSet(dataLocation, ImmutableList.of(deleteFile)); - } - - public static Set toPositionSet(CharSequence dataLocation, - List> deleteFiles) { - DataFileFilter locationFilter = new DataFileFilter<>(dataLocation); - List> positions = Lists.transform(deleteFiles, deletes -> - CloseableIterable.transform(locationFilter.filter(deletes), row -> (Long) POSITION_ACCESSOR.get(row))); - return toPositionSet(CloseableIterable.concat(positions)); - } - - public static Set toPositionSet(CloseableIterable posDeletes) { - try (CloseableIterable deletes = posDeletes) { - return Sets.newHashSet(deletes); - } catch (IOException e) { - throw new UncheckedIOException("Failed to close position delete source", e); - } - } - - public static PositionDeleteIndex toPositionBitmap(CharSequence dataLocation, - List> deleteFiles) { + public static PositionDeleteIndex toPositionIndex(CharSequence dataLocation, + List> deleteFiles) { DataFileFilter locationFilter = new DataFileFilter<>(dataLocation); List> positions = Lists.transform(deleteFiles, deletes -> CloseableIterable.transform(locationFilter.filter(deletes), row -> (Long) POSITION_ACCESSOR.get(row))); - return toPositionBitmap(CloseableIterable.concat(positions)); + return toPositionIndex(CloseableIterable.concat(positions)); } - public static PositionDeleteIndex toPositionBitmap(CloseableIterable posDeletes) { + public static PositionDeleteIndex toPositionIndex(CloseableIterable posDeletes) { try (CloseableIterable deletes = posDeletes) { PositionDeleteIndex positionDeleteIndex = new BitmapPositionDeleteIndex(); deletes.forEach(positionDeleteIndex::delete); @@ -161,16 +139,16 @@ protected boolean shouldKeep(T row) { private static class PositionSetDeleteFilter extends Filter { private final Function rowToPosition; - private final Set deleteSet; + private final PositionDeleteIndex deleteSet; - private PositionSetDeleteFilter(Function rowToPosition, Set deleteSet) { + private PositionSetDeleteFilter(Function rowToPosition, PositionDeleteIndex deleteSet) { this.rowToPosition = rowToPosition; this.deleteSet = deleteSet; } @Override protected boolean shouldKeep(T row) { - return !deleteSet.contains(rowToPosition.apply(row)); + return !deleteSet.isDeleted(rowToPosition.apply(row)); } } diff --git a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndex.java b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndex.java index 73ef397a9450..ced11b8acd19 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndex.java +++ b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndex.java @@ -38,5 +38,10 @@ public interface PositionDeleteIndex { * @param position deleted row position * @return whether the position is deleted */ - boolean deleted(long position); + boolean isDeleted(long position); + + /** + * Returns true if this collection contains no element. + */ + boolean isEmpty(); } diff --git a/core/src/test/java/org/apache/iceberg/deletes/TestPositionFilter.java b/core/src/test/java/org/apache/iceberg/deletes/TestPositionFilter.java index e6e5703df7cf..65e61ac688bd 100644 --- a/core/src/test/java/org/apache/iceberg/deletes/TestPositionFilter.java +++ b/core/src/test/java/org/apache/iceberg/deletes/TestPositionFilter.java @@ -218,7 +218,7 @@ public void testPositionSetRowFilter() { CloseableIterable actual = Deletes.filter( rows, row -> row.get(0, Long.class), - Deletes.toPositionSet(deletes)); + Deletes.toPositionIndex(deletes)); Assert.assertEquals("Filter should produce expected rows", Lists.newArrayList(1L, 2L, 5L, 6L, 8L), Lists.newArrayList(Iterables.transform(actual, row -> row.get(0, Long.class)))); @@ -256,7 +256,7 @@ public void testCombinedPositionSetRowFilter() { CloseableIterable actual = Deletes.filter( rows, row -> row.get(0, Long.class), - Deletes.toPositionSet("file_a.avro", ImmutableList.of(positionDeletes1, positionDeletes2))); + Deletes.toPositionIndex("file_a.avro", ImmutableList.of(positionDeletes1, positionDeletes2))); Assert.assertEquals("Filter should produce expected rows", Lists.newArrayList(1L, 2L, 5L, 6L, 8L), diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java index 6901859600f4..68b92ba782e7 100644 --- a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java +++ b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java @@ -214,7 +214,7 @@ public PositionDeleteIndex deletedRowPositions() { if (deleteRowPositions == null) { List> deletes = Lists.transform(posDeletes, this::openPosDeletes); - deleteRowPositions = Deletes.toPositionBitmap(dataFile.path(), deletes); + deleteRowPositions = Deletes.toPositionIndex(dataFile.path(), deletes); } return deleteRowPositions; } @@ -228,9 +228,7 @@ private CloseableIterable applyPosDeletes(CloseableIterable records) { // if there are fewer deletes than a reasonable number to keep in memory, use a set if (posDeletes.stream().mapToLong(DeleteFile::recordCount).sum() < setFilterThreshold) { - return Deletes.filter( - records, this::pos, - Deletes.toPositionSet(dataFile.path(), CloseableIterable.concat(deletes))); + return Deletes.filter(records, this::pos, Deletes.toPositionIndex(dataFile.path(), deletes)); } return Deletes.streamingFilter(records, this::pos, Deletes.deletePositions(dataFile.path(), deletes)); diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java index e74a947e835c..e18058130eb2 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java @@ -159,7 +159,7 @@ Pair buildPosDelRowIdMapping(PositionDeleteIndex deletedRowPosit int originalRowId = 0; int currentRowId = 0; while (originalRowId < numRowsToRead) { - if (!deletedRowPositions.deleted(originalRowId + rowStartPosInBatch)) { + if (!deletedRowPositions.isDeleted(originalRowId + rowStartPosInBatch)) { posDelRowIdMapping[currentRowId] = originalRowId; currentRowId++; } diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java index 8e42c6e3d15e..a349d84cff18 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java @@ -217,9 +217,14 @@ public void delete(long posStart, long posEnd) { } @Override - public boolean deleted(long position) { + public boolean isDeleted(long position) { return deleteIndex.contains(position); } + + @Override + public boolean isEmpty() { + return deleteIndex.isEmpty(); + } } @Test