Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 47 additions & 2 deletions core/src/main/java/org/apache/iceberg/deletes/Deletes.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.iceberg.Accessor;
import org.apache.iceberg.MetadataColumns;
Expand Down Expand Up @@ -107,6 +109,13 @@ public static <T> CloseableIterable<T> streamingFilter(CloseableIterable<T> rows
return new PositionStreamDeleteFilter<>(rows, rowToPosition, posDeletes);
}

public static <T> CloseableIterable<T> streamingDeletedRowMarker(CloseableIterable<T> rows,
Function<T, Long> rowToPosition,
CloseableIterable<Long> posDeletes,
Consumer<T> deleteMarker) {
return new PositionStreamDeletedRowMarker<>(rows, rowToPosition, posDeletes, deleteMarker);
}

public static CloseableIterable<Long> deletePositions(CharSequence dataLocation,
CloseableIterable<StructLike> deleteFile) {
return deletePositions(dataLocation, ImmutableList.of(deleteFile));
Expand Down Expand Up @@ -170,7 +179,7 @@ public CloseableIterator<T> iterator() {

CloseableIterator<T> iter;
if (deletePosIterator.hasNext()) {
iter = new PositionFilterIterator(rows.iterator(), deletePosIterator);
iter = positionIterator(rows.iterator(), deletePosIterator);
} else {
iter = rows.iterator();
try {
Expand All @@ -185,7 +194,12 @@ public CloseableIterator<T> iterator() {
return iter;
}

private class PositionFilterIterator extends FilterIterator<T> {
protected FilterIterator<T> positionIterator(CloseableIterator<T> items,
CloseableIterator<Long> newDeletePositions) {
return new PositionFilterIterator(items, newDeletePositions);
}

protected class PositionFilterIterator extends FilterIterator<T> {
private final CloseableIterator<Long> deletePosIterator;
private long nextDeletePos;

Expand Down Expand Up @@ -227,6 +241,37 @@ public void close() {
}
}

static class PositionStreamDeletedRowMarker<T> extends PositionStreamDeleteFilter<T> {
private final Consumer<T> deleteMarker;

private PositionStreamDeletedRowMarker(CloseableIterable<T> rows, Function<T, Long> extractPos,
CloseableIterable<Long> deletePositions,
Consumer<T> deleteMarker) {
super(rows, extractPos, deletePositions);
this.deleteMarker = deleteMarker;
}

@Override
protected FilterIterator<T> positionIterator(CloseableIterator<T> items,
CloseableIterator<Long> deletePositions) {
return new PositionMarkerIterator(items, deletePositions);
}

private class PositionMarkerIterator extends PositionFilterIterator {
private PositionMarkerIterator(CloseableIterator<T> items, CloseableIterator<Long> deletePositions) {
super(items, deletePositions);
}

@Override
protected boolean shouldKeep(T row) {
if (!super.shouldKeep(row)) {
deleteMarker.accept(row);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, it's really strange that we will modify the original row (set _deleted flag to be true) in a shouldKeep method because it should be a pure test method and should not modify certain states of the record. Otherwise, it is easy to cause confusion: after a record has undergone a different sequence of shouldKeep tests, the final result is different, and even the original record is modified.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to add a DeleteMarker to add the _is_deleted flag iteratively, pls see the PR #2434

}
return true;
}
}
}

private static class DataFileFilter<T extends StructLike> extends Filter<T> {
private final CharSequence dataLocation;

Expand Down
161 changes: 139 additions & 22 deletions data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Predicate;
import org.apache.iceberg.Accessor;
import org.apache.iceberg.DeleteFile;
Expand Down Expand Up @@ -67,6 +68,7 @@ public abstract class DeleteFilter<T> {
private final List<DeleteFile> eqDeletes;
private final Schema requiredSchema;
private final Accessor<StructLike> posAccessor;
private Integer deleteMarkerIndex = null;

private PositionDeleteIndex deleteRowPositions = null;
private Predicate<T> eqDeleteRows = null;
Expand Down Expand Up @@ -100,6 +102,29 @@ public Schema requiredSchema() {
return requiredSchema;
}

protected int deleteMarkerIndex() {
if (deleteMarkerIndex != null) {
return deleteMarkerIndex;
}

int index = 0;
for (Types.NestedField field : requiredSchema().columns()) {
if (field.fieldId() != MetadataColumns.IS_DELETED.fieldId()) {
index = index + 1;
} else {
break;
}
}

deleteMarkerIndex = index;

return deleteMarkerIndex;
}

protected abstract Consumer<T> deleteMarker();

protected abstract boolean isDeletedRow(T row);

public boolean hasPosDeletes() {
return !posDeletes.isEmpty();
}
Expand All @@ -124,11 +149,20 @@ public CloseableIterable<T> filter(CloseableIterable<T> records) {
return applyEqDeletes(applyPosDeletes(records));
}

private List<Predicate<T>> applyEqDeletes() {
List<Predicate<T>> isInDeleteSets = Lists.newArrayList();
private Filter<T> deletedRowsSelector() {
return new Filter<T>() {
@Override
protected boolean shouldKeep(T item) {
return isDeletedRow(item);
}
};
}

private Predicate<T> buildEqDeletePredicate() {
if (eqDeletes.isEmpty()) {
return isInDeleteSets;
return null;
}
Predicate<T> isDeleted = null;

Multimap<Set<Integer>, DeleteFile> filesByDeleteIds = Multimaps.newMultimap(Maps.newHashMap(), Lists::newArrayList);
for (DeleteFile delete : eqDeletes) {
Expand Down Expand Up @@ -156,43 +190,126 @@ private List<Predicate<T>> applyEqDeletes() {
records, record -> new InternalRecordWrapper(deleteSchema.asStruct()).wrap(record)),
deleteSchema.asStruct());

Predicate<T> isInDeleteSet = record -> deleteSet.contains(projectRow.wrap(asStructLike(record)));
isInDeleteSets.add(isInDeleteSet);
isDeleted = isDeleted == null ? record -> deleteSet.contains(projectRow.wrap(asStructLike(record))) :
isDeleted.or(record -> deleteSet.contains(projectRow.wrap(asStructLike(record))));
}

return isDeleted;
}

private Predicate<T> buildPosDeletePredicate() {
if (posDeletes.isEmpty()) {
return null;
}

Predicate<T> pred = null;

for (DeleteFile posDelete : posDeletes) {
CloseableIterable<Record> deleteRecords = openPosDeletes(posDelete);
Set<Long> deleteRecordSet = Deletes.toPositionSet(dataFile.path(), deleteRecords);
if (!deleteRecordSet.isEmpty()) {
pred = pred == null ? r -> deleteRecordSet.contains(pos(r)) : pred.or(r -> deleteRecordSet.contains(pos(r)));
}
}

return pred;
}

public CloseableIterable<T> keepRowsFromDeletes(CloseableIterable<T> records) {
Predicate<T> isDeletedFromPosDeletes = buildPosDeletePredicate();
if (isDeletedFromPosDeletes == null) {
return keepRowsFromEqualityDeletes(records);
}

Predicate<T> isDeletedFromEqDeletes = buildEqDeletePredicate();
if (isDeletedFromEqDeletes == null) {
return keepRowsFromPosDeletes(records);
}

return isInDeleteSets;
CloseableIterable<T> markedRecords;

if (posDeletes.stream().mapToLong(DeleteFile::recordCount).sum() < setFilterThreshold) {
markedRecords = CloseableIterable.transform(records, record -> {
if (isDeletedFromPosDeletes.test(record) || isDeletedFromEqDeletes.test(record)) {
deleteMarker().accept(record);
}
return record;
});

} else {
List<CloseableIterable<Record>> deletes = Lists.transform(posDeletes, this::openPosDeletes);
markedRecords = CloseableIterable.transform(Deletes.streamingDeletedRowMarker(records, this::pos,
Deletes.deletePositions(dataFile.path(), deletes), deleteMarker()), record -> {
if (!isDeletedRow(record) && isDeletedFromEqDeletes.test(record)) {
deleteMarker().accept(record);
}
return record;
});
}
return deletedRowsSelector().filter(markedRecords);
}

public CloseableIterable<T> findEqualityDeleteRows(CloseableIterable<T> records) {
private CloseableIterable<T> selectRowsFromDeletes(CloseableIterable<T> records, Predicate<T> isDeleted) {
CloseableIterable<T> markedRecords = CloseableIterable.transform(records, record -> {
if (isDeleted.test(record)) {
deleteMarker().accept(record);
}
return record;
});

return deletedRowsSelector().filter(markedRecords);
}

public CloseableIterable<T> keepRowsFromEqualityDeletes(CloseableIterable<T> records) {
// Predicate to test whether a row has been deleted by equality deletions.
Predicate<T> deletedRows = applyEqDeletes().stream()
.reduce(Predicate::or)
.orElse(t -> false);
Predicate<T> isDeleted = buildEqDeletePredicate();
if (isDeleted == null) {
return CloseableIterable.empty();
}

Filter<T> deletedRowsFilter = new Filter<T>() {
@Override
protected boolean shouldKeep(T item) {
return deletedRows.test(item);
return selectRowsFromDeletes(records, isDeleted);
}

public CloseableIterable<T> keepRowsFromPosDeletes(CloseableIterable<T> records) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a need to make these methods public? Or will rows only be read using keepRowsFromDeletes? What is the use case for these changes?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are used in DeleteRowReader which is in spark module.

// if there are fewer deletes than a reasonable number to keep in memory, use a set
if (posDeletes.stream().mapToLong(DeleteFile::recordCount).sum() < setFilterThreshold) {
// Predicate to test whether a row has been deleted by equality deletions.
Predicate<T> isDeleted = buildPosDeletePredicate();
if (isDeleted == null) {
return CloseableIterable.empty();
}
};
return deletedRowsFilter.filter(records);
return selectRowsFromDeletes(records, isDeleted);
} else {
List<CloseableIterable<Record>> deletes = Lists.transform(posDeletes, this::openPosDeletes);
CloseableIterable<T> markedRecords = Deletes.streamingDeletedRowMarker(records, this::pos,
Deletes.deletePositions(dataFile.path(), deletes), deleteMarker());

return deletedRowsSelector().filter(markedRecords);
}
}

private CloseableIterable<T> applyEqDeletes(CloseableIterable<T> records) {
// Predicate to test whether a row should be visible to user after applying equality deletions.
Predicate<T> remainingRows = applyEqDeletes().stream()
.map(Predicate::negate)
.reduce(Predicate::and)
.orElse(t -> true);
Predicate<T> isDeleted = buildEqDeletePredicate();
if (isDeleted == null) {
return records;
}

CloseableIterable<T> markedRecords = CloseableIterable.transform(records, record -> {
if (isDeleted.test(record)) {
deleteMarker().accept(record);
}
return record;
});

Filter<T> remainingRowsFilter = new Filter<T>() {
@Override
protected boolean shouldKeep(T item) {
return remainingRows.test(item);
return !isDeletedRow(item);
}
};

return remainingRowsFilter.filter(records);
return remainingRowsFilter.filter(markedRecords);
}

public Predicate<T> eqDeletedRowFilter() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iceberg.data;

import java.util.function.Consumer;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
Expand All @@ -40,6 +41,16 @@ protected long pos(Record record) {
return (Long) posAccessor().get(record);
}

@Override
protected Consumer<Record> deleteMarker() {
return record -> record.set(deleteMarkerIndex(), true);
}

@Override
protected boolean isDeletedRow(Record record) {
return record.get(deleteMarkerIndex(), Boolean.class);
}

@Override
protected StructLike asStructLike(Record record) {
return asStructLike.wrap(record);
Expand All @@ -49,4 +60,5 @@ protected StructLike asStructLike(Record record) {
protected InputFile getInputFile(String location) {
return io.newInputFile(location);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@
package org.apache.iceberg.flink.source;

import java.util.Map;
import java.util.function.Consumer;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.UpdatableRowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MetadataColumns;
Expand Down Expand Up @@ -178,6 +181,24 @@ private static class FlinkDeleteFilter extends DeleteFilter<RowData> {
this.inputFilesDecryptor = inputFilesDecryptor;
}

@Override
protected Consumer<RowData> deleteMarker() {
return record -> {
if (record instanceof GenericRowData) {
((GenericRowData) record).setField(deleteMarkerIndex(), true);
} else if (record instanceof UpdatableRowData) {
((UpdatableRowData) record).setField(deleteMarkerIndex(), true);
} else {
throw new UnsupportedOperationException("Can not mark row data");
}
};
}

@Override
protected boolean isDeletedRow(RowData row) {
return row.getBoolean(deleteMarkerIndex());
}

public RowType requiredRowType() {
return requiredRowType;
}
Expand Down
Loading