Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/src/test/java/org/apache/iceberg/TestHelpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public <T> T get(int pos, Class<T> javaClass) {

@Override
public <T> void set(int pos, T value) {
throw new UnsupportedOperationException("Setting values is not supported");
values[pos] = value;
}

@Override
Expand Down
145 changes: 78 additions & 67 deletions core/src/main/java/org/apache/iceberg/deletes/Deletes.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.iceberg.Accessor;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
Expand Down Expand Up @@ -63,14 +65,18 @@ public static <T> CloseableIterable<T> filter(CloseableIterable<T> rows, Functio
return equalityFilter.filter(rows);
}

public static <T> CloseableIterable<T> filter(CloseableIterable<T> rows, Function<T, Long> rowToPosition,
PositionDeleteIndex deleteSet) {
if (deleteSet.isEmpty()) {
return rows;
}
public static <T> CloseableIterable<T> markDeleted(CloseableIterable<T> rows, Predicate<T> isDeleted,
Consumer<T> deleteMarker) {
return CloseableIterable.transform(rows, row -> {
if (isDeleted.test(row)) {
deleteMarker.accept(row);
}
return row;
});
}

PositionSetDeleteFilter<T> filter = new PositionSetDeleteFilter<>(rowToPosition, deleteSet);
return filter.filter(rows);
public static <T> CloseableIterable<T> filterDeleted(CloseableIterable<T> rows, Predicate<T> isDeleted) {
return CloseableIterable.filter(rows, isDeleted.negate());
}

public static StructLikeSet toEqualitySet(CloseableIterable<StructLike> eqDeletes, Types.StructType eqType) {
Expand Down Expand Up @@ -107,6 +113,13 @@ public static <T> CloseableIterable<T> streamingFilter(CloseableIterable<T> rows
return new PositionStreamDeleteFilter<>(rows, rowToPosition, posDeletes);
}

public static <T> CloseableIterable<T> streamingMarker(CloseableIterable<T> rows,
Function<T, Long> rowToPosition,
CloseableIterable<Long> posDeletes,
Consumer<T> markDeleted) {
return new PositionStreamDeleteMarker<>(rows, rowToPosition, posDeletes, markDeleted);
}

public static CloseableIterable<Long> deletePositions(CharSequence dataLocation,
CloseableIterable<StructLike> deleteFile) {
return deletePositions(dataLocation, ImmutableList.of(deleteFile));
Expand Down Expand Up @@ -137,93 +150,91 @@ protected boolean shouldKeep(T row) {
}
}

private static class PositionSetDeleteFilter<T> extends Filter<T> {
private final Function<T, Long> rowToPosition;
private final PositionDeleteIndex deleteSet;

private PositionSetDeleteFilter(Function<T, Long> rowToPosition, PositionDeleteIndex deleteSet) {
this.rowToPosition = rowToPosition;
this.deleteSet = deleteSet;
}

@Override
protected boolean shouldKeep(T row) {
return !deleteSet.isDeleted(rowToPosition.apply(row));
}
}

private static class PositionStreamDeleteFilter<T> extends CloseableGroup implements CloseableIterable<T> {
private abstract static class PositionStreamDeleteIterable<T> extends CloseableGroup implements CloseableIterable<T> {
private final CloseableIterable<T> rows;
private final Function<T, Long> extractPos;
private final CloseableIterable<Long> deletePositions;
private final CloseableIterator<Long> deletePosIterator;
private final Function<T, Long> rowToPosition;
private long nextDeletePos;

private PositionStreamDeleteFilter(CloseableIterable<T> rows, Function<T, Long> extractPos,
CloseableIterable<Long> deletePositions) {
PositionStreamDeleteIterable(CloseableIterable<T> rows, Function<T, Long> rowToPosition,
CloseableIterable<Long> deletePositions) {
this.rows = rows;
this.extractPos = extractPos;
this.deletePositions = deletePositions;
this.rowToPosition = rowToPosition;
this.deletePosIterator = deletePositions.iterator();
}

@Override
public CloseableIterator<T> iterator() {
CloseableIterator<Long> deletePosIterator = deletePositions.iterator();

CloseableIterator<T> iter;
if (deletePosIterator.hasNext()) {
iter = new PositionFilterIterator(rows.iterator(), deletePosIterator);
nextDeletePos = deletePosIterator.next();
iter = applyDelete(rows.iterator());
} else {
iter = rows.iterator();
try {
deletePosIterator.close();
} catch (IOException e) {
throw new UncheckedIOException("Failed to close delete positions iterator", e);
}
}

addCloseable(iter);
addCloseable(deletePosIterator);

return iter;
}

private class PositionFilterIterator extends FilterIterator<T> {
private final CloseableIterator<Long> deletePosIterator;
private long nextDeletePos;
boolean isDeleted(T row) {
long currentPos = rowToPosition.apply(row);
if (currentPos < nextDeletePos) {
return false;
}

protected PositionFilterIterator(CloseableIterator<T> items, CloseableIterator<Long> deletePositions) {
super(items);
this.deletePosIterator = deletePositions;
// consume delete positions until the next is past the current position
Copy link
Member

@RussellSpitzer RussellSpitzer May 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this is any simpler but we can remove one if statement, not sure if this is more clear.

    // Consume nextDeletePos till past currentPos, if currentPos equals any consumed nextDeletePos the current row has been deleted
    boolean isDeleted = currentPos == nextDeletePos;
    while (deletePosIterator.hasNext() && nextDeletePos <= currentPos) {
      this.nextDeletePos = deletePosIterator.next();
      isDeleted |= currentPos == nextDeletePos
    }
  return isDeleted;
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm OK with either one. I kind of think if is easier to read. If we will make it simpler, here is another way, which we don't have to check on isDeleted. What do you think?

while (deletePosIterator.hasNext() && nextDeletePos <= currentPos) {
      this.nextDeletePos = deletePosIterator.next();
      if (currentPos == nextDeletePos) {
        isDeleted = true;
      }
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep it as-is to avoid last-minute changes in this tricky place.

boolean isDeleted = currentPos == nextDeletePos;
while (deletePosIterator.hasNext() && nextDeletePos <= currentPos) {
this.nextDeletePos = deletePosIterator.next();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now the only place in the code which refers to it as "this.nextDeletePos"

if (!isDeleted && currentPos == nextDeletePos) {
// if any delete position matches the current position
isDeleted = true;
}
Comment on lines +192 to +195
Copy link
Contributor Author

@flyrain flyrain May 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic isn't necessary. Not sure why it is there at the beginning. The only chance to go into the while loop is that there are duplicated pos deletes. For example, these positions are deleted, 0L, 0L, 1L, 3L, 3L. It is not necessary to set anything in that case, line 185 already check it. To provide more context, both row position and pos deletes are sorted. Check testPositionStreamRowFilterWithDuplicates for details.
Double checked, these logic are needed. Modified the test testPositionStreamRowFilterWithRowGaps to cover this code path.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic here is tricky but seems correct to me.

}

@Override
protected boolean shouldKeep(T row) {
long currentPos = extractPos.apply(row);
if (currentPos < nextDeletePos) {
return true;
}
return isDeleted;
}

protected abstract CloseableIterator<T> applyDelete(CloseableIterator<T> items);
}

private static class PositionStreamDeleteFilter<T> extends PositionStreamDeleteIterable<T> {
private PositionStreamDeleteFilter(CloseableIterable<T> rows, Function<T, Long> rowToPosition,
CloseableIterable<Long> deletePositions) {
super(rows, rowToPosition, deletePositions);
}

// consume delete positions until the next is past the current position
boolean keep = currentPos != nextDeletePos;
while (deletePosIterator.hasNext() && nextDeletePos <= currentPos) {
this.nextDeletePos = deletePosIterator.next();
if (keep && currentPos == nextDeletePos) {
// if any delete position matches the current position, discard
keep = false;
}
@Override
protected CloseableIterator<T> applyDelete(CloseableIterator<T> items) {
return new FilterIterator<T>(items) {
@Override
protected boolean shouldKeep(T item) {
return !isDeleted(item);
}
};
}
}

return keep;
}
private static class PositionStreamDeleteMarker<T> extends PositionStreamDeleteIterable<T> {
private final Consumer<T> markDeleted;

PositionStreamDeleteMarker(CloseableIterable<T> rows, Function<T, Long> rowToPosition,
CloseableIterable<Long> deletePositions, Consumer<T> markDeleted) {
super(rows, rowToPosition, deletePositions);
this.markDeleted = markDeleted;
}

@Override
public void close() {
super.close();
try {
deletePosIterator.close();
} catch (IOException e) {
throw new UncheckedIOException("Failed to close delete positions iterator", e);
@Override
protected CloseableIterator<T> applyDelete(CloseableIterator<T> items) {
return CloseableIterator.transform(items, row -> {
if (isDeleted(row)) {
markDeleted.accept(row);
}
}
return row;
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iceberg.deletes;

import java.util.List;
import java.util.function.Predicate;
import org.apache.avro.util.Utf8;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.TestHelpers.Row;
Expand Down Expand Up @@ -117,6 +118,33 @@ public void testPositionStreamRowFilter() {
Lists.newArrayList(Iterables.transform(actual, row -> row.get(0, Long.class))));
}

@Test
public void testPositionStreamRowDeleteMarker() {
CloseableIterable<StructLike> rows = CloseableIterable.withNoopClose(Lists.newArrayList(
Row.of(0L, "a", false),
Row.of(1L, "b", false),
Row.of(2L, "c", false),
Row.of(3L, "d", false),
Row.of(4L, "e", false),
Row.of(5L, "f", false),
Row.of(6L, "g", false),
Row.of(7L, "h", false),
Row.of(8L, "i", false),
Row.of(9L, "j", false)
));

CloseableIterable<Long> deletes = CloseableIterable.withNoopClose(Lists.newArrayList(0L, 3L, 4L, 7L, 9L));

CloseableIterable<StructLike> actual = Deletes.streamingMarker(rows,
row -> row.get(0, Long.class), /* row to position */
deletes,
row -> row.set(2, true) /* delete marker */
);
Assert.assertEquals("Filter should produce expected rows",
Lists.newArrayList(true, false, false, true, true, false, false, true, false, true),
Lists.newArrayList(Iterables.transform(actual, row -> row.get(2, Boolean.class))));
}

@Test
public void testPositionStreamRowFilterWithDuplicates() {
CloseableIterable<StructLike> rows = CloseableIterable.withNoopClose(Lists.newArrayList(
Expand Down Expand Up @@ -151,11 +179,11 @@ public void testPositionStreamRowFilterWithRowGaps() {
Row.of(6L, "g")
));

CloseableIterable<Long> deletes = CloseableIterable.withNoopClose(Lists.newArrayList(0L, 3L, 4L, 7L, 9L));
CloseableIterable<Long> deletes = CloseableIterable.withNoopClose(Lists.newArrayList(0L, 2L, 3L, 4L, 7L, 9L));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is to cover the code path line 188-191 in class Deletes


CloseableIterable<StructLike> actual = Deletes.streamingFilter(rows, row -> row.get(0, Long.class), deletes);
Assert.assertEquals("Filter should produce expected rows",
Lists.newArrayList(2L, 5L, 6L),
Lists.newArrayList(5L, 6L),
Lists.newArrayList(Iterables.transform(actual, row -> row.get(0, Long.class))));
}

Expand Down Expand Up @@ -216,9 +244,8 @@ public void testPositionSetRowFilter() {

CloseableIterable<Long> deletes = CloseableIterable.withNoopClose(Lists.newArrayList(0L, 3L, 4L, 7L, 9L));

CloseableIterable<StructLike> actual = Deletes.filter(
rows, row -> row.get(0, Long.class),
Deletes.toPositionIndex(deletes));
Predicate<StructLike> shouldKeep = row -> !Deletes.toPositionIndex(deletes).isDeleted(row.get(0, Long.class));
CloseableIterable<StructLike> actual = CloseableIterable.filter(rows, shouldKeep);
Assert.assertEquals("Filter should produce expected rows",
Lists.newArrayList(1L, 2L, 5L, 6L, 8L),
Lists.newArrayList(Iterables.transform(actual, row -> row.get(0, Long.class))));
Expand Down Expand Up @@ -254,9 +281,11 @@ public void testCombinedPositionSetRowFilter() {
Row.of(9L, "j")
));

CloseableIterable<StructLike> actual = Deletes.filter(
rows, row -> row.get(0, Long.class),
Deletes.toPositionIndex("file_a.avro", ImmutableList.of(positionDeletes1, positionDeletes2)));
Predicate<StructLike> isDeleted = row -> Deletes
.toPositionIndex("file_a.avro", ImmutableList.of(positionDeletes1, positionDeletes2))
.isDeleted(row.get(0, Long.class));

CloseableIterable<StructLike> actual = CloseableIterable.filter(rows, isDeleted.negate());

Assert.assertEquals("Filter should produce expected rows",
Lists.newArrayList(1L, 2L, 5L, 6L, 8L),
Expand Down
54 changes: 29 additions & 25 deletions data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Filter;
import org.apache.iceberg.util.StructLikeSet;
import org.apache.iceberg.util.StructProjection;

Expand All @@ -67,6 +66,8 @@ public abstract class DeleteFilter<T> {
private final List<DeleteFile> eqDeletes;
private final Schema requiredSchema;
private final Accessor<StructLike> posAccessor;
private final boolean hasIsDeletedColumn;
private final int isDeletedColumnPosition;

private PositionDeleteIndex deleteRowPositions = null;
private Predicate<T> eqDeleteRows = null;
Expand Down Expand Up @@ -94,6 +95,12 @@ protected DeleteFilter(String filePath, List<DeleteFile> deletes, Schema tableSc
this.eqDeletes = eqDeleteBuilder.build();
this.requiredSchema = fileProjection(tableSchema, requestedSchema, posDeletes, eqDeletes);
this.posAccessor = requiredSchema.accessorForField(MetadataColumns.ROW_POSITION.fieldId());
this.hasIsDeletedColumn = requiredSchema.findField(MetadataColumns.IS_DELETED.fieldId()) != null;
this.isDeletedColumnPosition = requiredSchema.columns().indexOf(MetadataColumns.IS_DELETED);
}

protected int columnIsDeletedPosition() {
Copy link
Contributor

@aokolnychyi aokolnychyi May 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: isDeletedColumnPosition()?

return isDeletedColumnPosition;
}

public Schema requiredSchema() {
Expand Down Expand Up @@ -169,30 +176,19 @@ public CloseableIterable<T> findEqualityDeleteRows(CloseableIterable<T> records)
.reduce(Predicate::or)
.orElse(t -> false);

Filter<T> deletedRowsFilter = new Filter<T>() {
@Override
protected boolean shouldKeep(T item) {
return deletedRows.test(item);
}
};
return deletedRowsFilter.filter(records);
return CloseableIterable.filter(records, deletedRows);
}

private CloseableIterable<T> applyEqDeletes(CloseableIterable<T> records) {
// Predicate to test whether a row should be visible to user after applying equality deletions.
Predicate<T> remainingRows = applyEqDeletes().stream()
.map(Predicate::negate)
.reduce(Predicate::and)
.orElse(t -> true);

Filter<T> remainingRowsFilter = new Filter<T>() {
@Override
protected boolean shouldKeep(T item) {
return remainingRows.test(item);
}
};
Predicate<T> isEqDeleted = applyEqDeletes().stream()
.reduce(Predicate::or)
.orElse(t -> false);

return remainingRowsFilter.filter(records);
return createDeleteIterable(records, isEqDeleted);
}

protected void markRowDeleted(T item) {
throw new UnsupportedOperationException(this.getClass().getName() + " does not implement markRowDeleted");
}

public Predicate<T> eqDeletedRowFilter() {
Expand Down Expand Up @@ -226,10 +222,20 @@ private CloseableIterable<T> applyPosDeletes(CloseableIterable<T> records) {

// if there are fewer deletes than a reasonable number to keep in memory, use a set
if (posDeletes.stream().mapToLong(DeleteFile::recordCount).sum() < setFilterThreshold) {
return Deletes.filter(records, this::pos, Deletes.toPositionIndex(filePath, deletes));
PositionDeleteIndex positionIndex = Deletes.toPositionIndex(filePath, deletes);
Predicate<T> isDeleted = record -> positionIndex.isDeleted(pos(record));
return createDeleteIterable(records, isDeleted);
}

return Deletes.streamingFilter(records, this::pos, Deletes.deletePositions(filePath, deletes));
return hasIsDeletedColumn ?
Deletes.streamingMarker(records, this::pos, Deletes.deletePositions(filePath, deletes), this::markRowDeleted) :
Deletes.streamingFilter(records, this::pos, Deletes.deletePositions(filePath, deletes));
}

private CloseableIterable<T> createDeleteIterable(CloseableIterable<T> records, Predicate<T> isDeleted) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a better name? Is it an iterable of remaining rows?

return hasIsDeletedColumn ?
Deletes.markDeleted(records, isDeleted, this::markRowDeleted) :
Deletes.filterDeleted(records, isDeleted);
}

private CloseableIterable<Record> openPosDeletes(DeleteFile file) {
Expand Down Expand Up @@ -290,8 +296,6 @@ private static Schema fileProjection(Schema tableSchema, Schema requestedSchema,
requiredIds.addAll(eqDelete.equalityFieldIds());
}

requiredIds.add(MetadataColumns.IS_DELETED.fieldId());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you comment on why this was removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't add the metadata column if user doesn't ask a project or filter on it, right?
It was added by #2538. Hi @chenjunjiedada, any idea why we need this line in #2538?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already project an extra position column, right? It depends on whether the projected column is useful. The projected _delete column can be used for converting delete as I wrote if I recall. The CDC changes may also need it for joining, right?

Copy link
Contributor Author

@flyrain flyrain May 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We project the pos column only if there are pos deletes as the following code shows, which makes sense, since we need it for filtering pos deletes.

    if (!posDeletes.isEmpty()) {
      requiredIds.add(MetadataColumns.ROW_POSITION.fieldId());
    }

Here is my thought on Is_deleted column, it presents only if the front end(e.g. spark read) asked for it. For example, in case of CDC, we put it in the filter to read deleted rows. Here is the code from my CDC draft PR #4539.

    Dataset<Row> scanDF = spark().read().format("iceberg")
        .option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, groupID)
        .load(table.name())
        .filter(functions.column(MetadataColumns.IS_DELETED.name()).equalTo(true));

What do you think?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me!


Set<Integer> missingIds = Sets.newLinkedHashSet(
Sets.difference(requiredIds, TypeUtil.getProjectedIds(requestedSchema)));

Expand Down
Loading