Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions core/src/main/java/org/apache/iceberg/deletes/Deletes.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,13 @@ public static PositionDeleteIndex toPositionIndex(CloseableIterable<Long> posDel

public static <T> CloseableIterable<T> streamingFilter(CloseableIterable<T> rows,
Function<T, Long> rowToPosition,
CloseableIterable<Long> posDeletes) {
CloseableIterator<Long> posDeletes) {
return new PositionStreamDeleteFilter<>(rows, rowToPosition, posDeletes);
}

public static <T> CloseableIterable<T> streamingMarker(CloseableIterable<T> rows,
Function<T, Long> rowToPosition,
CloseableIterable<Long> posDeletes,
CloseableIterator<Long> posDeletes,
Consumer<T> markDeleted) {
return new PositionStreamDeleteMarker<>(rows, rowToPosition, posDeletes, markDeleted);
}
Expand Down Expand Up @@ -157,10 +157,10 @@ private abstract static class PositionStreamDeleteIterable<T> extends CloseableG
private long nextDeletePos;

PositionStreamDeleteIterable(CloseableIterable<T> rows, Function<T, Long> rowToPosition,
CloseableIterable<Long> deletePositions) {
CloseableIterator<Long> deletePositions) {
this.rows = rows;
this.rowToPosition = rowToPosition;
this.deletePosIterator = deletePositions.iterator();
this.deletePosIterator = deletePositions;
}

@Override
Expand Down Expand Up @@ -203,7 +203,7 @@ boolean isDeleted(T row) {

private static class PositionStreamDeleteFilter<T> extends PositionStreamDeleteIterable<T> {
private PositionStreamDeleteFilter(CloseableIterable<T> rows, Function<T, Long> rowToPosition,
CloseableIterable<Long> deletePositions) {
CloseableIterator<Long> deletePositions) {
super(rows, rowToPosition, deletePositions);
}

Expand All @@ -222,7 +222,7 @@ private static class PositionStreamDeleteMarker<T> extends PositionStreamDeleteI
private final Consumer<T> markDeleted;

PositionStreamDeleteMarker(CloseableIterable<T> rows, Function<T, Long> rowToPosition,
CloseableIterable<Long> deletePositions, Consumer<T> markDeleted) {
CloseableIterator<Long> deletePositions, Consumer<T> markDeleted) {
super(rows, rowToPosition, deletePositions);
this.markDeleted = markDeleted;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,9 @@ public void testPositionStreamRowFilter() {

CloseableIterable<Long> deletes = CloseableIterable.withNoopClose(Lists.newArrayList(0L, 3L, 4L, 7L, 9L));

CloseableIterable<StructLike> actual = Deletes.streamingFilter(rows, row -> row.get(0, Long.class), deletes);
CloseableIterable<StructLike> actual = Deletes.streamingFilter(rows,
row -> row.get(0, Long.class),
deletes.iterator());
Assert.assertEquals("Filter should produce expected rows",
Lists.newArrayList(1L, 2L, 5L, 6L, 8L),
Lists.newArrayList(Iterables.transform(actual, row -> row.get(0, Long.class))));
Expand All @@ -137,7 +139,7 @@ public void testPositionStreamRowDeleteMarker() {

CloseableIterable<StructLike> actual = Deletes.streamingMarker(rows,
row -> row.get(0, Long.class), /* row to position */
deletes,
deletes.iterator(),
row -> row.set(2, true) /* delete marker */
);
Assert.assertEquals("Filter should produce expected rows",
Expand All @@ -163,7 +165,9 @@ public void testPositionStreamRowFilterWithDuplicates() {
CloseableIterable<Long> deletes = CloseableIterable.withNoopClose(
Lists.newArrayList(0L, 0L, 0L, 3L, 4L, 7L, 7L, 9L, 9L, 9L));

CloseableIterable<StructLike> actual = Deletes.streamingFilter(rows, row -> row.get(0, Long.class), deletes);
CloseableIterable<StructLike> actual = Deletes.streamingFilter(rows,
row -> row.get(0, Long.class),
deletes.iterator());
Assert.assertEquals("Filter should produce expected rows",
Lists.newArrayList(1L, 2L, 5L, 6L, 8L),
Lists.newArrayList(Iterables.transform(actual, row -> row.get(0, Long.class))));
Expand All @@ -181,7 +185,9 @@ public void testPositionStreamRowFilterWithRowGaps() {

CloseableIterable<Long> deletes = CloseableIterable.withNoopClose(Lists.newArrayList(0L, 2L, 3L, 4L, 7L, 9L));

CloseableIterable<StructLike> actual = Deletes.streamingFilter(rows, row -> row.get(0, Long.class), deletes);
CloseableIterable<StructLike> actual = Deletes.streamingFilter(rows,
row -> row.get(0, Long.class),
deletes.iterator());
Assert.assertEquals("Filter should produce expected rows",
Lists.newArrayList(5L, 6L),
Lists.newArrayList(Iterables.transform(actual, row -> row.get(0, Long.class))));
Expand Down Expand Up @@ -220,7 +226,8 @@ public void testCombinedPositionStreamRowFilter() {
CloseableIterable<StructLike> actual = Deletes.streamingFilter(
rows,
row -> row.get(0, Long.class),
Deletes.deletePositions("file_a.avro", ImmutableList.of(positionDeletes1, positionDeletes2)));
Deletes.deletePositions("file_a.avro",
ImmutableList.of(positionDeletes1, positionDeletes2)).iterator());

Assert.assertEquals("Filter should produce expected rows",
Lists.newArrayList(1L, 2L, 5L, 6L, 8L),
Expand Down
22 changes: 16 additions & 6 deletions data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.iceberg.deletes.PositionDeleteIndex;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
Expand Down Expand Up @@ -72,6 +73,7 @@ public abstract class DeleteFilter<T> {
private PositionDeleteIndex deleteRowPositions = null;
private List<Predicate<T>> isInDeleteSets = null;
private Predicate<T> eqDeleteRows = null;
private CloseableIterator<Long> deletePositionIte;

protected DeleteFilter(String filePath, List<DeleteFile> deletes, Schema tableSchema, Schema requestedSchema) {
this.setFilterThreshold = DEFAULT_SET_FILTER_THRESHOLD;
Expand Down Expand Up @@ -128,6 +130,9 @@ protected long pos(T record) {
return (Long) posAccessor.get(asStructLike(record));
}

/**
* Please only close the last CloseableIterable if you want to reuse the filter
*/
public CloseableIterable<T> filter(CloseableIterable<T> records) {
return applyEqDeletes(applyPosDeletes(records));
}
Expand Down Expand Up @@ -222,18 +227,23 @@ private CloseableIterable<T> applyPosDeletes(CloseableIterable<T> records) {
return records;
}

List<CloseableIterable<Record>> deletes = Lists.transform(posDeletes, this::openPosDeletes);

// if there are fewer deletes than a reasonable number to keep in memory, use a set
if (posDeletes.stream().mapToLong(DeleteFile::recordCount).sum() < setFilterThreshold) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about the else case? In that case we will not cache.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it won't cache because the deletes are streamed from files.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, Because it use sort merge to avoid loading too many delete rows in memory.

Copy link
Contributor Author

@shidayang shidayang Jul 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But, They should't reopen position delete file and read position delete file from head, they should read position delete file continue。I will do some change here

PositionDeleteIndex positionIndex = Deletes.toPositionIndex(filePath, deletes);
Predicate<T> isDeleted = record -> positionIndex.isDeleted(pos(record));
if (deleteRowPositions == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this should run line 225 to open the deletes if they can be reused.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get you

List<CloseableIterable<Record>> deletes = Lists.transform(posDeletes, this::openPosDeletes);
deleteRowPositions = Deletes.toPositionIndex(filePath, deletes);
}
Predicate<T> isDeleted = record -> deleteRowPositions.isDeleted(pos(record));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If deleteRowPositions is null, is this predicate at risk of an NPE?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No NPE. The return of " Deletes.toPositionIndex(filePath, deletes)" must not be null.

return createDeleteIterable(records, isDeleted);
}

if (deletePositionIte == null) {
List<CloseableIterable<Record>> deletes = Lists.transform(posDeletes, this::openPosDeletes);
deletePositionIte = Deletes.deletePositions(filePath, deletes).iterator();
}
return hasIsDeletedColumn ?
Deletes.streamingMarker(records, this::pos, Deletes.deletePositions(filePath, deletes), this::markRowDeleted) :
Deletes.streamingFilter(records, this::pos, Deletes.deletePositions(filePath, deletes));
Deletes.streamingMarker(records, this::pos, deletePositionIte, this::markRowDeleted) :
Deletes.streamingFilter(records, this::pos, deletePositionIte);
}

private CloseableIterable<T> createDeleteIterable(CloseableIterable<T> records, Predicate<T> isDeleted) {
Expand Down