diff --git a/core/src/main/java/org/apache/iceberg/deletes/DeleteSetter.java b/core/src/main/java/org/apache/iceberg/deletes/DeleteSetter.java new file mode 100644 index 000000000000..3ddd9cd704f4 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/deletes/DeleteSetter.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.deletes; + +public abstract class DeleteSetter { + + private T row; + + protected T row() { + return this.row; + } + + public DeleteSetter wrap(T newRow) { + this.row = newRow; + return this; + } + + public abstract boolean isDeleted(); + + public abstract T setDeleted(); +} diff --git a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java index 62154f7d6071..e9b4e258f468 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java +++ b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java @@ -23,6 +23,7 @@ import java.io.UncheckedIOException; import java.util.Comparator; import java.util.List; +import java.util.NoSuchElementException; import java.util.Set; import java.util.function.Function; import org.apache.iceberg.Accessor; @@ -32,13 +33,13 @@ import org.apache.iceberg.io.CloseableGroup; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; -import org.apache.iceberg.io.FilterIterator; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.Comparators; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.DeleteMarker; import org.apache.iceberg.util.Filter; import org.apache.iceberg.util.SortedMerge; import org.apache.iceberg.util.StructLikeSet; @@ -67,14 +68,16 @@ public static CloseableIterable filter(CloseableIterable rows, Functio return equalityFilter.filter(rows); } - public static CloseableIterable filter(CloseableIterable rows, Function rowToPosition, - Set deleteSet) { + public static CloseableIterable filter(CloseableIterable rows, + Function rowToPosition, + Set deleteSet, + DeleteSetter mutationWrapper) { if (deleteSet.isEmpty()) { return rows; } - PositionSetDeleteFilter filter = new PositionSetDeleteFilter<>(rowToPosition, deleteSet); - return filter.filter(rows); + PositionSetDeleteMarker iterable = new PositionSetDeleteMarker<>(rowToPosition, deleteSet, mutationWrapper); + return iterable.setDeleted(rows); } public static StructLikeSet toEqualitySet(CloseableIterable eqDeletes, Types.StructType eqType) { @@ -107,10 +110,11 @@ public static Set toPositionSet(CloseableIterable posDeletes) { } } - public static CloseableIterable streamingFilter(CloseableIterable rows, - Function rowToPosition, - CloseableIterable posDeletes) { - return new PositionStreamDeleteFilter<>(rows, rowToPosition, posDeletes); + public static CloseableIterable streamingDeleteMarker(CloseableIterable rows, + Function rowToPosition, + CloseableIterable posDeletes, + DeleteSetter deleteSetter) { + return new PositionStreamDeleteMarker<>(rows, rowToPosition, posDeletes, deleteSetter); } public static CloseableIterable deletePositions(CharSequence dataLocation, @@ -143,31 +147,38 @@ protected boolean shouldKeep(T row) { } } - private static class PositionSetDeleteFilter extends Filter { + private static class PositionSetDeleteMarker extends DeleteMarker { private final Function rowToPosition; private final Set deleteSet; - private PositionSetDeleteFilter(Function rowToPosition, Set deleteSet) { + private PositionSetDeleteMarker(Function rowToPosition, + Set deleteSet, + DeleteSetter deleteSetter) { + super(deleteSetter); this.rowToPosition = rowToPosition; this.deleteSet = deleteSet; } @Override - protected boolean shouldKeep(T row) { - return !deleteSet.contains(rowToPosition.apply(row)); + protected boolean shouldDelete(T row) { + return deleteSet.contains(rowToPosition.apply(row)); } } - private static class PositionStreamDeleteFilter extends CloseableGroup implements CloseableIterable { + private static class PositionStreamDeleteMarker extends CloseableGroup implements CloseableIterable { private final CloseableIterable rows; private final Function extractPos; private final CloseableIterable deletePositions; + private final DeleteSetter deleteSetter; - private PositionStreamDeleteFilter(CloseableIterable rows, Function extractPos, - CloseableIterable deletePositions) { + private PositionStreamDeleteMarker(CloseableIterable rows, + Function extractPos, + CloseableIterable deletePositions, + DeleteSetter deleteSetter) { this.rows = rows; this.extractPos = extractPos; this.deletePositions = deletePositions; + this.deleteSetter = deleteSetter; } @Override @@ -176,7 +187,7 @@ public CloseableIterator iterator() { CloseableIterator iter; if (deletePosIterator.hasNext()) { - iter = new PositionFilterIterator(rows.iterator(), deletePosIterator); + iter = new PositionDeleteMarkerIterator(rows.iterator(), deletePosIterator); } else { iter = rows.iterator(); try { @@ -191,21 +202,42 @@ public CloseableIterator iterator() { return iter; } - private class PositionFilterIterator extends FilterIterator { + private class PositionDeleteMarkerIterator implements CloseableIterator { + private final CloseableIterator items; private final CloseableIterator deletePosIterator; private long nextDeletePos; - protected PositionFilterIterator(CloseableIterator items, CloseableIterator deletePositions) { - super(items); + protected PositionDeleteMarkerIterator(CloseableIterator items, CloseableIterator deletePositions) { + this.items = items; this.deletePosIterator = deletePositions; this.nextDeletePos = deletePosIterator.next(); } @Override - protected boolean shouldKeep(T row) { + public boolean hasNext() { + return items.hasNext(); + } + + @Override + public T next() { + if (!items.hasNext()) { + throw new NoSuchElementException(); + } + + T current = items.next(); + + deleteSetter.wrap(current); + if (!deleteSetter.isDeleted() && shouldDelete(current)) { + return deleteSetter.wrap(current).setDeleted(); + } else { + return current; + } + } + + private boolean shouldDelete(T row) { long currentPos = extractPos.apply(row); if (currentPos < nextDeletePos) { - return true; + return false; } // consume delete positions until the next is past the current position @@ -218,12 +250,17 @@ protected boolean shouldKeep(T row) { } } - return keep; + return !keep; } @Override public void close() { - super.close(); + try { + items.close(); + } catch (IOException e) { + throw new UncheckedIOException("Failed to close the iterator", e); + } + try { deletePosIterator.close(); } catch (IOException e) { diff --git a/core/src/main/java/org/apache/iceberg/util/DeleteMarker.java b/core/src/main/java/org/apache/iceberg/util/DeleteMarker.java new file mode 100644 index 000000000000..51389b569c75 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/util/DeleteMarker.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.util; + +import java.util.Iterator; +import java.util.NoSuchElementException; +import org.apache.iceberg.deletes.DeleteSetter; +import org.apache.iceberg.io.CloseableIterable; + +public abstract class DeleteMarker { + + private final DeleteSetter deleteSetter; + + public DeleteMarker(DeleteSetter deleteSetter) { + this.deleteSetter = deleteSetter; + } + + protected abstract boolean shouldDelete(T item); + + private Iterable setDeleted(Iterable iterable) { + return () -> new InternalIterator(iterable.iterator()); + } + + public CloseableIterable setDeleted(CloseableIterable iterable) { + return CloseableIterable.combine(setDeleted((Iterable) iterable), iterable); + } + + private class InternalIterator implements Iterator { + private final Iterator iterator; + + private InternalIterator(Iterator iterator) { + this.iterator = iterator; + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public T next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + T current = iterator.next(); + + deleteSetter.wrap(current); + if (!deleteSetter.isDeleted() && shouldDelete(current)) { + return deleteSetter.setDeleted(); + } else { + return current; + } + } + } +} diff --git a/core/src/test/java/org/apache/iceberg/deletes/TestPositionFilter.java b/core/src/test/java/org/apache/iceberg/deletes/TestPositionFilter.java index e6e5703df7cf..d9be0a2cea49 100644 --- a/core/src/test/java/org/apache/iceberg/deletes/TestPositionFilter.java +++ b/core/src/test/java/org/apache/iceberg/deletes/TestPositionFilter.java @@ -111,7 +111,7 @@ public void testPositionStreamRowFilter() { CloseableIterable deletes = CloseableIterable.withNoopClose(Lists.newArrayList(0L, 3L, 4L, 7L, 9L)); - CloseableIterable actual = Deletes.streamingFilter(rows, row -> row.get(0, Long.class), deletes); + CloseableIterable actual = Deletes.streamingDeleteMarker(rows, row -> row.get(0, Long.class), deletes); Assert.assertEquals("Filter should produce expected rows", Lists.newArrayList(1L, 2L, 5L, 6L, 8L), Lists.newArrayList(Iterables.transform(actual, row -> row.get(0, Long.class)))); @@ -135,7 +135,7 @@ public void testPositionStreamRowFilterWithDuplicates() { CloseableIterable deletes = CloseableIterable.withNoopClose( Lists.newArrayList(0L, 0L, 0L, 3L, 4L, 7L, 7L, 9L, 9L, 9L)); - CloseableIterable actual = Deletes.streamingFilter(rows, row -> row.get(0, Long.class), deletes); + CloseableIterable actual = Deletes.streamingDeleteMarker(rows, row -> row.get(0, Long.class), deletes); Assert.assertEquals("Filter should produce expected rows", Lists.newArrayList(1L, 2L, 5L, 6L, 8L), Lists.newArrayList(Iterables.transform(actual, row -> row.get(0, Long.class)))); @@ -153,7 +153,7 @@ public void testPositionStreamRowFilterWithRowGaps() { CloseableIterable deletes = CloseableIterable.withNoopClose(Lists.newArrayList(0L, 3L, 4L, 7L, 9L)); - CloseableIterable actual = Deletes.streamingFilter(rows, row -> row.get(0, Long.class), deletes); + CloseableIterable actual = Deletes.streamingDeleteMarker(rows, row -> row.get(0, Long.class), deletes); Assert.assertEquals("Filter should produce expected rows", Lists.newArrayList(2L, 5L, 6L), Lists.newArrayList(Iterables.transform(actual, row -> row.get(0, Long.class)))); @@ -189,7 +189,7 @@ public void testCombinedPositionStreamRowFilter() { Row.of(9L, "j") )); - CloseableIterable actual = Deletes.streamingFilter( + CloseableIterable actual = Deletes.streamingDeleteMarker( rows, row -> row.get(0, Long.class), Deletes.deletePositions("file_a.avro", ImmutableList.of(positionDeletes1, positionDeletes2))); diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java index 7a0752975e80..de7b17d73ecc 100644 --- a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java +++ b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java @@ -35,6 +35,7 @@ import org.apache.iceberg.avro.Avro; import org.apache.iceberg.data.avro.DataReader; import org.apache.iceberg.data.parquet.GenericParquetReaders; +import org.apache.iceberg.deletes.DeleteSetter; import org.apache.iceberg.deletes.Deletes; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.CloseableIterable; @@ -49,7 +50,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; -import org.apache.iceberg.util.Filter; +import org.apache.iceberg.util.DeleteMarker; import org.apache.iceberg.util.StructLikeSet; import org.apache.iceberg.util.StructProjection; import org.apache.parquet.Preconditions; @@ -65,9 +66,13 @@ public abstract class DeleteFilter { private final List posDeletes; private final List eqDeletes; private final Schema requiredSchema; + private final DeleteSetter deleteSetter; private final Accessor posAccessor; - protected DeleteFilter(FileScanTask task, Schema tableSchema, Schema requestedSchema) { + protected DeleteFilter(FileScanTask task, + Schema tableSchema, + Schema requestedSchema, + DeleteSetter deleteSetter) { this.setFilterThreshold = DEFAULT_SET_FILTER_THRESHOLD; this.dataFile = task.file(); @@ -89,6 +94,7 @@ protected DeleteFilter(FileScanTask task, Schema tableSchema, Schema requestedSc this.posDeletes = posDeleteBuilder.build(); this.eqDeletes = eqDeleteBuilder.build(); this.requiredSchema = fileProjection(tableSchema, requestedSchema, posDeletes, eqDeletes); + this.deleteSetter = deleteSetter; this.posAccessor = requiredSchema.accessorForField(MetadataColumns.ROW_POSITION.fieldId()); } @@ -109,13 +115,14 @@ protected long pos(T record) { } public CloseableIterable filter(CloseableIterable records) { - return applyEqDeletes(applyPosDeletes(records)); + CloseableIterable rawRecords = applyEqDeletes(applyPosDeletes(records)); + return CloseableIterable.filter(rawRecords, row -> !deleteSetter.wrap(row).isDeleted()); } - private List> applyEqDeletes() { - List> isInDeleteSets = Lists.newArrayList(); + private Predicate buildEqDeletesPredicate() { + Predicate isDelete = t -> false; if (eqDeletes.isEmpty()) { - return isInDeleteSets; + return isDelete; } Multimap, DeleteFile> filesByDeleteIds = Multimaps.newMultimap(Maps.newHashMap(), Lists::newArrayList); @@ -140,42 +147,29 @@ private List> applyEqDeletes() { deleteSchema.asStruct()); Predicate isInDeleteSet = record -> deleteSet.contains(projectRow.wrap(asStructLike(record))); - isInDeleteSets.add(isInDeleteSet); + isDelete = isDelete.or(isInDeleteSet); } - return isInDeleteSets; + return isDelete; } - public CloseableIterable findEqualityDeleteRows(CloseableIterable records) { - // Predicate to test whether a row has been deleted by equality deletions. - Predicate deletedRows = applyEqDeletes().stream() - .reduce(Predicate::or) - .orElse(t -> false); - - Filter deletedRowsFilter = new Filter() { - @Override - protected boolean shouldKeep(T item) { - return deletedRows.test(item); - } - }; - return deletedRowsFilter.filter(records); + public CloseableIterable findDeletedRows(CloseableIterable records) { + CloseableIterable rawRecords = applyEqDeletes(applyPosDeletes(records)); + return CloseableIterable.filter(rawRecords, row -> deleteSetter.wrap(row).isDeleted()); } private CloseableIterable applyEqDeletes(CloseableIterable records) { // Predicate to test whether a row should be visible to user after applying equality deletions. - Predicate remainingRows = applyEqDeletes().stream() - .map(Predicate::negate) - .reduce(Predicate::and) - .orElse(t -> true); + Predicate remainingRows = buildEqDeletesPredicate().negate(); - Filter remainingRowsFilter = new Filter() { + DeleteMarker deleteMarker = new DeleteMarker(deleteSetter) { @Override - protected boolean shouldKeep(T item) { + protected boolean shouldDelete(T item) { return remainingRows.test(item); } }; - return remainingRowsFilter.filter(records); + return deleteMarker.setDeleted(records); } private CloseableIterable applyPosDeletes(CloseableIterable records) { @@ -189,10 +183,14 @@ private CloseableIterable applyPosDeletes(CloseableIterable records) { if (posDeletes.stream().mapToLong(DeleteFile::recordCount).sum() < setFilterThreshold) { return Deletes.filter( records, this::pos, - Deletes.toPositionSet(dataFile.path(), CloseableIterable.concat(deletes))); + Deletes.toPositionSet(dataFile.path(), CloseableIterable.concat(deletes)), + deleteSetter); } - return Deletes.streamingFilter(records, this::pos, Deletes.deletePositions(dataFile.path(), deletes)); + return Deletes.streamingDeleteMarker(records, + this::pos, + Deletes.deletePositions(dataFile.path(), deletes), + deleteSetter); } private CloseableIterable openPosDeletes(DeleteFile file) { diff --git a/flink/src/main/java/org/apache/iceberg/flink/FlinkDeleteSetter.java b/flink/src/main/java/org/apache/iceberg/flink/FlinkDeleteSetter.java new file mode 100644 index 000000000000..3aa67fd949d7 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/FlinkDeleteSetter.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink; + +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.UpdatableRowData; +import org.apache.iceberg.deletes.DeleteSetter; + +public class FlinkDeleteSetter extends DeleteSetter { + + public FlinkDeleteSetter() { + } + + private int deleteColumnIndex() { + return 0; + } + + @Override + public boolean isDeleted() { + return row().getBoolean(deleteColumnIndex()); + } + + @Override + public RowData setDeleted() { + RowData rowData = this.row(); + + int idx = deleteColumnIndex(); + if (rowData instanceof GenericRowData) { + ((GenericRowData) rowData).setField(idx, true); + } else if (rowData instanceof UpdatableRowData) { + ((UpdatableRowData) rowData).setField(idx, true); + } else { + throw new UnsupportedOperationException( + rowData.getClass() + " does not support set the _deleted column to be true."); + } + + return rowData; + } +} diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/DeletedRowReader.java similarity index 84% rename from spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java rename to spark/src/main/java/org/apache/iceberg/spark/source/DeletedRowReader.java index a85d85d6e117..81ee2134ac83 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/DeletedRowReader.java @@ -31,11 +31,11 @@ import org.apache.spark.rdd.InputFileBlockHolder; import org.apache.spark.sql.catalyst.InternalRow; -public class EqualityDeleteRowReader extends RowDataReader { +public class DeletedRowReader extends RowDataReader { private final Schema expectedSchema; - public EqualityDeleteRowReader(CombinedScanTask task, Schema schema, Schema expectedSchema, String nameMapping, - FileIO io, EncryptionManager encryptionManager, boolean caseSensitive) { + public DeletedRowReader(CombinedScanTask task, Schema schema, Schema expectedSchema, String nameMapping, + FileIO io, EncryptionManager encryptionManager, boolean caseSensitive) { super(task, schema, schema, nameMapping, io, encryptionManager, caseSensitive); this.expectedSchema = expectedSchema; } @@ -52,6 +52,6 @@ CloseableIterator open(FileScanTask task) { // update the current file for Spark's filename() function InputFileBlockHolder.set(file.path().toString(), task.start(), task.length()); - return matches.findEqualityDeleteRows(open(task, requiredSchema, idToConstant)).iterator(); + return matches.findDeletedRows(open(task, requiredSchema, idToConstant)).iterator(); } } diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java index 75b37c75f76c..76403f38fc9a 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java @@ -206,7 +206,7 @@ public void testReadEqualityDeleteRows() throws IOException { TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT); for (CombinedScanTask task : tasks) { - try (EqualityDeleteRowReader reader = new EqualityDeleteRowReader(task, table.schema(), table.schema(), + try (DeletedRowReader reader = new DeletedRowReader(task, table.schema(), table.schema(), table.properties().get(DEFAULT_NAME_MAPPING), table.io(), table.encryption(), false)) { while (reader.next()) { actualRowSet.add(new InternalRowWrapper(SparkSchemaUtil.convert(table.schema())).wrap(reader.get().copy()));