diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java index a7979fd2ed3e..2be976b3f91f 100644 --- a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java +++ b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java @@ -58,11 +58,11 @@ public abstract class DeleteFilter { private static final Logger LOG = LoggerFactory.getLogger(DeleteFilter.class); - private static final long DEFAULT_SET_FILTER_THRESHOLD = 100_000L; + private static final long DEFAULT_STREAM_FILTER_THRESHOLD = 100_000L; private static final Schema POS_DELETE_SCHEMA = new Schema(MetadataColumns.DELETE_FILE_PATH, MetadataColumns.DELETE_FILE_POS); - private final long setFilterThreshold; + private final long streamFilterThreshold; private final String filePath; private final List posDeletes; private final List eqDeletes; @@ -82,7 +82,12 @@ protected DeleteFilter( Schema tableSchema, Schema requestedSchema, DeleteCounter counter) { - this.setFilterThreshold = DEFAULT_SET_FILTER_THRESHOLD; + // For testing purposes only, we may set the stream filter threshold via a system property + String testingThreshold = System.getProperty("iceberg.stream-delete-filter-threshold"); + this.streamFilterThreshold = + (testingThreshold != null) + ? Long.parseLong(testingThreshold) + : DEFAULT_STREAM_FILTER_THRESHOLD; this.filePath = filePath; this.counter = counter; @@ -246,7 +251,7 @@ private CloseableIterable applyPosDeletes(CloseableIterable records) { List> deletes = Lists.transform(posDeletes, this::openPosDeletes); // if there are fewer deletes than a reasonable number to keep in memory, use a set - if (posDeletes.stream().mapToLong(DeleteFile::recordCount).sum() < setFilterThreshold) { + if (posDeletes.stream().mapToLong(DeleteFile::recordCount).sum() < streamFilterThreshold) { PositionDeleteIndex positionIndex = Deletes.toPositionIndex(filePath, deletes); Predicate isDeleted = record -> positionIndex.isDeleted(pos(record)); return createDeleteIterable(records, isDeleted); diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java index 4924f07bf198..4335c94b5492 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java @@ -83,18 +83,22 @@ public class TestSparkReaderDeletes extends DeleteReadTests { private final String format; private final boolean vectorized; - public TestSparkReaderDeletes(String format, boolean vectorized) { + public TestSparkReaderDeletes(String format, boolean vectorized, boolean useStreamDeleteFilter) { this.format = format; this.vectorized = vectorized; + System.clearProperty("iceberg.stream-delete-filter-threshold"); + if (useStreamDeleteFilter) { + System.setProperty("iceberg.stream-delete-filter-threshold", "2"); + } } - @Parameterized.Parameters(name = "format = {0}, vectorized = {1}") + @Parameterized.Parameters(name = "format = {0}, vectorized = {1}, useStreamDeleteFilter = {2}") public static Object[][] parameters() { return new Object[][] { - new Object[] {"parquet", false}, - new Object[] {"parquet", true}, - new Object[] {"orc", false}, - new Object[] {"avro", false} + new Object[] {"parquet", false, true}, + new Object[] {"parquet", true, false}, + new Object[] {"orc", false, true}, + new Object[] {"avro", false, false} }; } @@ -132,6 +136,7 @@ public static void stopMetastoreAndSpark() throws Exception { metastore = null; spark.stop(); spark = null; + System.clearProperty("iceberg.stream-delete-filter-threshold"); } @Override