Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@

public abstract class DeleteFilter<T> {
private static final Logger LOG = LoggerFactory.getLogger(DeleteFilter.class);
private static final long DEFAULT_SET_FILTER_THRESHOLD = 100_000L;
private static final long DEFAULT_STREAM_FILTER_THRESHOLD = 100_000L;
private static final Schema POS_DELETE_SCHEMA =
new Schema(MetadataColumns.DELETE_FILE_PATH, MetadataColumns.DELETE_FILE_POS);

private final long setFilterThreshold;
private final long streamFilterThreshold;
Copy link
Contributor Author

@wypoon wypoon Sep 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think calling this the streamFilterThreshold is more appropriate, since it is the threshold at which we use the streaming delete filter.

private final String filePath;
private final List<DeleteFile> posDeletes;
private final List<DeleteFile> eqDeletes;
Expand All @@ -82,7 +82,12 @@ protected DeleteFilter(
Schema tableSchema,
Schema requestedSchema,
DeleteCounter counter) {
this.setFilterThreshold = DEFAULT_SET_FILTER_THRESHOLD;
// For testing purposes only, we may set the stream filter threshold via a system property
String testingThreshold = System.getProperty("iceberg.stream-delete-filter-threshold");
this.streamFilterThreshold =
(testingThreshold != null)
? Long.parseLong(testingThreshold)
: DEFAULT_STREAM_FILTER_THRESHOLD;
this.filePath = filePath;
this.counter = counter;

Expand Down Expand Up @@ -246,7 +251,7 @@ private CloseableIterable<T> applyPosDeletes(CloseableIterable<T> records) {
List<CloseableIterable<Record>> deletes = Lists.transform(posDeletes, this::openPosDeletes);

// if there are fewer deletes than a reasonable number to keep in memory, use a set
if (posDeletes.stream().mapToLong(DeleteFile::recordCount).sum() < setFilterThreshold) {
if (posDeletes.stream().mapToLong(DeleteFile::recordCount).sum() < streamFilterThreshold) {
PositionDeleteIndex positionIndex = Deletes.toPositionIndex(filePath, deletes);
Predicate<T> isDeleted = record -> positionIndex.isDeleted(pos(record));
return createDeleteIterable(records, isDeleted);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,18 +83,22 @@ public class TestSparkReaderDeletes extends DeleteReadTests {
private final String format;
private final boolean vectorized;

public TestSparkReaderDeletes(String format, boolean vectorized) {
public TestSparkReaderDeletes(String format, boolean vectorized, boolean useStreamDeleteFilter) {
this.format = format;
this.vectorized = vectorized;
System.clearProperty("iceberg.stream-delete-filter-threshold");
if (useStreamDeleteFilter) {
System.setProperty("iceberg.stream-delete-filter-threshold", "2");
}
}

@Parameterized.Parameters(name = "format = {0}, vectorized = {1}")
@Parameterized.Parameters(name = "format = {0}, vectorized = {1}, useStreamDeleteFilter = {2}")
public static Object[][] parameters() {
return new Object[][] {
new Object[] {"parquet", false},
new Object[] {"parquet", true},
new Object[] {"orc", false},
new Object[] {"avro", false}
new Object[] {"parquet", false, true},
new Object[] {"parquet", true, false},
new Object[] {"orc", false, true},
new Object[] {"avro", false, false}
};
}

Expand Down Expand Up @@ -132,6 +136,7 @@ public static void stopMetastoreAndSpark() throws Exception {
metastore = null;
spark.stop();
spark = null;
System.clearProperty("iceberg.stream-delete-filter-threshold");
}

@Override
Expand Down