diff --git a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java index 6d6aba2bbbe3..4d4b548869a3 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java +++ b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java @@ -109,13 +109,13 @@ public static PositionDeleteIndex toPositionIndex(CloseableIterable posDel public static CloseableIterable streamingFilter(CloseableIterable rows, Function rowToPosition, - CloseableIterable posDeletes) { + CloseableIterator posDeletes) { return new PositionStreamDeleteFilter<>(rows, rowToPosition, posDeletes); } public static CloseableIterable streamingMarker(CloseableIterable rows, Function rowToPosition, - CloseableIterable posDeletes, + CloseableIterator posDeletes, Consumer markDeleted) { return new PositionStreamDeleteMarker<>(rows, rowToPosition, posDeletes, markDeleted); } @@ -157,10 +157,10 @@ private abstract static class PositionStreamDeleteIterable extends CloseableG private long nextDeletePos; PositionStreamDeleteIterable(CloseableIterable rows, Function rowToPosition, - CloseableIterable deletePositions) { + CloseableIterator deletePositions) { this.rows = rows; this.rowToPosition = rowToPosition; - this.deletePosIterator = deletePositions.iterator(); + this.deletePosIterator = deletePositions; } @Override @@ -203,7 +203,7 @@ boolean isDeleted(T row) { private static class PositionStreamDeleteFilter extends PositionStreamDeleteIterable { private PositionStreamDeleteFilter(CloseableIterable rows, Function rowToPosition, - CloseableIterable deletePositions) { + CloseableIterator deletePositions) { super(rows, rowToPosition, deletePositions); } @@ -222,7 +222,7 @@ private static class PositionStreamDeleteMarker extends PositionStreamDeleteI private final Consumer markDeleted; PositionStreamDeleteMarker(CloseableIterable rows, Function rowToPosition, - CloseableIterable deletePositions, Consumer markDeleted) { + CloseableIterator deletePositions, Consumer markDeleted) { super(rows, rowToPosition, deletePositions); this.markDeleted = markDeleted; } diff --git a/core/src/test/java/org/apache/iceberg/deletes/TestPositionFilter.java b/core/src/test/java/org/apache/iceberg/deletes/TestPositionFilter.java index dc255c7c4ab0..acaea76275b0 100644 --- a/core/src/test/java/org/apache/iceberg/deletes/TestPositionFilter.java +++ b/core/src/test/java/org/apache/iceberg/deletes/TestPositionFilter.java @@ -112,7 +112,9 @@ public void testPositionStreamRowFilter() { CloseableIterable deletes = CloseableIterable.withNoopClose(Lists.newArrayList(0L, 3L, 4L, 7L, 9L)); - CloseableIterable actual = Deletes.streamingFilter(rows, row -> row.get(0, Long.class), deletes); + CloseableIterable actual = Deletes.streamingFilter(rows, + row -> row.get(0, Long.class), + deletes.iterator()); Assert.assertEquals("Filter should produce expected rows", Lists.newArrayList(1L, 2L, 5L, 6L, 8L), Lists.newArrayList(Iterables.transform(actual, row -> row.get(0, Long.class)))); @@ -137,7 +139,7 @@ public void testPositionStreamRowDeleteMarker() { CloseableIterable actual = Deletes.streamingMarker(rows, row -> row.get(0, Long.class), /* row to position */ - deletes, + deletes.iterator(), row -> row.set(2, true) /* delete marker */ ); Assert.assertEquals("Filter should produce expected rows", @@ -163,7 +165,9 @@ public void testPositionStreamRowFilterWithDuplicates() { CloseableIterable deletes = CloseableIterable.withNoopClose( Lists.newArrayList(0L, 0L, 0L, 3L, 4L, 7L, 7L, 9L, 9L, 9L)); - CloseableIterable actual = Deletes.streamingFilter(rows, row -> row.get(0, Long.class), deletes); + CloseableIterable actual = Deletes.streamingFilter(rows, + row -> row.get(0, Long.class), + deletes.iterator()); Assert.assertEquals("Filter should produce expected rows", Lists.newArrayList(1L, 2L, 5L, 6L, 8L), Lists.newArrayList(Iterables.transform(actual, row -> row.get(0, Long.class)))); @@ -181,7 +185,9 @@ public void testPositionStreamRowFilterWithRowGaps() { CloseableIterable deletes = CloseableIterable.withNoopClose(Lists.newArrayList(0L, 2L, 3L, 4L, 7L, 9L)); - CloseableIterable actual = Deletes.streamingFilter(rows, row -> row.get(0, Long.class), deletes); + CloseableIterable actual = Deletes.streamingFilter(rows, + row -> row.get(0, Long.class), + deletes.iterator()); Assert.assertEquals("Filter should produce expected rows", Lists.newArrayList(5L, 6L), Lists.newArrayList(Iterables.transform(actual, row -> row.get(0, Long.class)))); @@ -220,7 +226,8 @@ public void testCombinedPositionStreamRowFilter() { CloseableIterable actual = Deletes.streamingFilter( rows, row -> row.get(0, Long.class), - Deletes.deletePositions("file_a.avro", ImmutableList.of(positionDeletes1, positionDeletes2))); + Deletes.deletePositions("file_a.avro", + ImmutableList.of(positionDeletes1, positionDeletes2)).iterator()); Assert.assertEquals("Filter should produce expected rows", Lists.newArrayList(1L, 2L, 5L, 6L, 8L), diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java index 90c7133cdc4e..b65b92c08c99 100644 --- a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java +++ b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java @@ -38,6 +38,7 @@ import org.apache.iceberg.deletes.PositionDeleteIndex; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; @@ -72,6 +73,7 @@ public abstract class DeleteFilter { private PositionDeleteIndex deleteRowPositions = null; private List> isInDeleteSets = null; private Predicate eqDeleteRows = null; + private CloseableIterator deletePositionIte; protected DeleteFilter(String filePath, List deletes, Schema tableSchema, Schema requestedSchema) { this.setFilterThreshold = DEFAULT_SET_FILTER_THRESHOLD; @@ -128,6 +130,9 @@ protected long pos(T record) { return (Long) posAccessor.get(asStructLike(record)); } + /** + * Please only close the last CloseableIterable if you want to reuse the filter + */ public CloseableIterable filter(CloseableIterable records) { return applyEqDeletes(applyPosDeletes(records)); } @@ -222,18 +227,23 @@ private CloseableIterable applyPosDeletes(CloseableIterable records) { return records; } - List> deletes = Lists.transform(posDeletes, this::openPosDeletes); - // if there are fewer deletes than a reasonable number to keep in memory, use a set if (posDeletes.stream().mapToLong(DeleteFile::recordCount).sum() < setFilterThreshold) { - PositionDeleteIndex positionIndex = Deletes.toPositionIndex(filePath, deletes); - Predicate isDeleted = record -> positionIndex.isDeleted(pos(record)); + if (deleteRowPositions == null) { + List> deletes = Lists.transform(posDeletes, this::openPosDeletes); + deleteRowPositions = Deletes.toPositionIndex(filePath, deletes); + } + Predicate isDeleted = record -> deleteRowPositions.isDeleted(pos(record)); return createDeleteIterable(records, isDeleted); } + if (deletePositionIte == null) { + List> deletes = Lists.transform(posDeletes, this::openPosDeletes); + deletePositionIte = Deletes.deletePositions(filePath, deletes).iterator(); + } return hasIsDeletedColumn ? - Deletes.streamingMarker(records, this::pos, Deletes.deletePositions(filePath, deletes), this::markRowDeleted) : - Deletes.streamingFilter(records, this::pos, Deletes.deletePositions(filePath, deletes)); + Deletes.streamingMarker(records, this::pos, deletePositionIte, this::markRowDeleted) : + Deletes.streamingFilter(records, this::pos, deletePositionIte); } private CloseableIterable createDeleteIterable(CloseableIterable records, Predicate isDeleted) {