diff --git a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java index af5d95339383..3805d6759fc9 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java +++ b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java @@ -22,6 +22,8 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.List; +import java.util.Set; +import java.util.function.Consumer; import java.util.function.Function; import org.apache.iceberg.Accessor; import org.apache.iceberg.MetadataColumns; @@ -107,6 +109,13 @@ public static CloseableIterable streamingFilter(CloseableIterable rows return new PositionStreamDeleteFilter<>(rows, rowToPosition, posDeletes); } + public static CloseableIterable streamingDeletedRowMarker(CloseableIterable rows, + Function rowToPosition, + CloseableIterable posDeletes, + Consumer deleteMarker) { + return new PositionStreamDeletedRowMarker<>(rows, rowToPosition, posDeletes, deleteMarker); + } + public static CloseableIterable deletePositions(CharSequence dataLocation, CloseableIterable deleteFile) { return deletePositions(dataLocation, ImmutableList.of(deleteFile)); @@ -170,7 +179,7 @@ public CloseableIterator iterator() { CloseableIterator iter; if (deletePosIterator.hasNext()) { - iter = new PositionFilterIterator(rows.iterator(), deletePosIterator); + iter = positionIterator(rows.iterator(), deletePosIterator); } else { iter = rows.iterator(); try { @@ -185,7 +194,12 @@ public CloseableIterator iterator() { return iter; } - private class PositionFilterIterator extends FilterIterator { + protected FilterIterator positionIterator(CloseableIterator items, + CloseableIterator newDeletePositions) { + return new PositionFilterIterator(items, newDeletePositions); + } + + protected class PositionFilterIterator extends FilterIterator { private final CloseableIterator deletePosIterator; private long nextDeletePos; @@ -227,6 +241,37 @@ public void close() { } } + static class PositionStreamDeletedRowMarker extends PositionStreamDeleteFilter { + private final Consumer deleteMarker; + + private PositionStreamDeletedRowMarker(CloseableIterable rows, Function extractPos, + CloseableIterable deletePositions, + Consumer deleteMarker) { + super(rows, extractPos, deletePositions); + this.deleteMarker = deleteMarker; + } + + @Override + protected FilterIterator positionIterator(CloseableIterator items, + CloseableIterator deletePositions) { + return new PositionMarkerIterator(items, deletePositions); + } + + private class PositionMarkerIterator extends PositionFilterIterator { + private PositionMarkerIterator(CloseableIterator items, CloseableIterator deletePositions) { + super(items, deletePositions); + } + + @Override + protected boolean shouldKeep(T row) { + if (!super.shouldKeep(row)) { + deleteMarker.accept(row); + } + return true; + } + } + } + private static class DataFileFilter extends Filter { private final CharSequence dataLocation; diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java index 8d420487943b..9036b57cbe2e 100644 --- a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java +++ b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Consumer; import java.util.function.Predicate; import org.apache.iceberg.Accessor; import org.apache.iceberg.DeleteFile; @@ -67,6 +68,7 @@ public abstract class DeleteFilter { private final List eqDeletes; private final Schema requiredSchema; private final Accessor posAccessor; + private Integer deleteMarkerIndex = null; private PositionDeleteIndex deleteRowPositions = null; private Predicate eqDeleteRows = null; @@ -100,6 +102,29 @@ public Schema requiredSchema() { return requiredSchema; } + protected int deleteMarkerIndex() { + if (deleteMarkerIndex != null) { + return deleteMarkerIndex; + } + + int index = 0; + for (Types.NestedField field : requiredSchema().columns()) { + if (field.fieldId() != MetadataColumns.IS_DELETED.fieldId()) { + index = index + 1; + } else { + break; + } + } + + deleteMarkerIndex = index; + + return deleteMarkerIndex; + } + + protected abstract Consumer deleteMarker(); + + protected abstract boolean isDeletedRow(T row); + public boolean hasPosDeletes() { return !posDeletes.isEmpty(); } @@ -124,11 +149,20 @@ public CloseableIterable filter(CloseableIterable records) { return applyEqDeletes(applyPosDeletes(records)); } - private List> applyEqDeletes() { - List> isInDeleteSets = Lists.newArrayList(); + private Filter deletedRowsSelector() { + return new Filter() { + @Override + protected boolean shouldKeep(T item) { + return isDeletedRow(item); + } + }; + } + + private Predicate buildEqDeletePredicate() { if (eqDeletes.isEmpty()) { - return isInDeleteSets; + return null; } + Predicate isDeleted = null; Multimap, DeleteFile> filesByDeleteIds = Multimaps.newMultimap(Maps.newHashMap(), Lists::newArrayList); for (DeleteFile delete : eqDeletes) { @@ -156,43 +190,126 @@ private List> applyEqDeletes() { records, record -> new InternalRecordWrapper(deleteSchema.asStruct()).wrap(record)), deleteSchema.asStruct()); - Predicate isInDeleteSet = record -> deleteSet.contains(projectRow.wrap(asStructLike(record))); - isInDeleteSets.add(isInDeleteSet); + isDeleted = isDeleted == null ? record -> deleteSet.contains(projectRow.wrap(asStructLike(record))) : + isDeleted.or(record -> deleteSet.contains(projectRow.wrap(asStructLike(record)))); + } + + return isDeleted; + } + + private Predicate buildPosDeletePredicate() { + if (posDeletes.isEmpty()) { + return null; + } + + Predicate pred = null; + + for (DeleteFile posDelete : posDeletes) { + CloseableIterable deleteRecords = openPosDeletes(posDelete); + Set deleteRecordSet = Deletes.toPositionSet(dataFile.path(), deleteRecords); + if (!deleteRecordSet.isEmpty()) { + pred = pred == null ? r -> deleteRecordSet.contains(pos(r)) : pred.or(r -> deleteRecordSet.contains(pos(r))); + } + } + + return pred; + } + + public CloseableIterable keepRowsFromDeletes(CloseableIterable records) { + Predicate isDeletedFromPosDeletes = buildPosDeletePredicate(); + if (isDeletedFromPosDeletes == null) { + return keepRowsFromEqualityDeletes(records); + } + + Predicate isDeletedFromEqDeletes = buildEqDeletePredicate(); + if (isDeletedFromEqDeletes == null) { + return keepRowsFromPosDeletes(records); } - return isInDeleteSets; + CloseableIterable markedRecords; + + if (posDeletes.stream().mapToLong(DeleteFile::recordCount).sum() < setFilterThreshold) { + markedRecords = CloseableIterable.transform(records, record -> { + if (isDeletedFromPosDeletes.test(record) || isDeletedFromEqDeletes.test(record)) { + deleteMarker().accept(record); + } + return record; + }); + + } else { + List> deletes = Lists.transform(posDeletes, this::openPosDeletes); + markedRecords = CloseableIterable.transform(Deletes.streamingDeletedRowMarker(records, this::pos, + Deletes.deletePositions(dataFile.path(), deletes), deleteMarker()), record -> { + if (!isDeletedRow(record) && isDeletedFromEqDeletes.test(record)) { + deleteMarker().accept(record); + } + return record; + }); + } + return deletedRowsSelector().filter(markedRecords); } - public CloseableIterable findEqualityDeleteRows(CloseableIterable records) { + private CloseableIterable selectRowsFromDeletes(CloseableIterable records, Predicate isDeleted) { + CloseableIterable markedRecords = CloseableIterable.transform(records, record -> { + if (isDeleted.test(record)) { + deleteMarker().accept(record); + } + return record; + }); + + return deletedRowsSelector().filter(markedRecords); + } + + public CloseableIterable keepRowsFromEqualityDeletes(CloseableIterable records) { // Predicate to test whether a row has been deleted by equality deletions. - Predicate deletedRows = applyEqDeletes().stream() - .reduce(Predicate::or) - .orElse(t -> false); + Predicate isDeleted = buildEqDeletePredicate(); + if (isDeleted == null) { + return CloseableIterable.empty(); + } - Filter deletedRowsFilter = new Filter() { - @Override - protected boolean shouldKeep(T item) { - return deletedRows.test(item); + return selectRowsFromDeletes(records, isDeleted); + } + + public CloseableIterable keepRowsFromPosDeletes(CloseableIterable records) { + // if there are fewer deletes than a reasonable number to keep in memory, use a set + if (posDeletes.stream().mapToLong(DeleteFile::recordCount).sum() < setFilterThreshold) { + // Predicate to test whether a row has been deleted by equality deletions. + Predicate isDeleted = buildPosDeletePredicate(); + if (isDeleted == null) { + return CloseableIterable.empty(); } - }; - return deletedRowsFilter.filter(records); + return selectRowsFromDeletes(records, isDeleted); + } else { + List> deletes = Lists.transform(posDeletes, this::openPosDeletes); + CloseableIterable markedRecords = Deletes.streamingDeletedRowMarker(records, this::pos, + Deletes.deletePositions(dataFile.path(), deletes), deleteMarker()); + + return deletedRowsSelector().filter(markedRecords); + } } private CloseableIterable applyEqDeletes(CloseableIterable records) { // Predicate to test whether a row should be visible to user after applying equality deletions. - Predicate remainingRows = applyEqDeletes().stream() - .map(Predicate::negate) - .reduce(Predicate::and) - .orElse(t -> true); + Predicate isDeleted = buildEqDeletePredicate(); + if (isDeleted == null) { + return records; + } + + CloseableIterable markedRecords = CloseableIterable.transform(records, record -> { + if (isDeleted.test(record)) { + deleteMarker().accept(record); + } + return record; + }); Filter remainingRowsFilter = new Filter() { @Override protected boolean shouldKeep(T item) { - return remainingRows.test(item); + return !isDeletedRow(item); } }; - return remainingRowsFilter.filter(records); + return remainingRowsFilter.filter(markedRecords); } public Predicate eqDeletedRowFilter() { diff --git a/data/src/main/java/org/apache/iceberg/data/GenericDeleteFilter.java b/data/src/main/java/org/apache/iceberg/data/GenericDeleteFilter.java index cae426a93a7f..789a04a9d9e0 100644 --- a/data/src/main/java/org/apache/iceberg/data/GenericDeleteFilter.java +++ b/data/src/main/java/org/apache/iceberg/data/GenericDeleteFilter.java @@ -19,6 +19,7 @@ package org.apache.iceberg.data; +import java.util.function.Consumer; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; @@ -40,6 +41,16 @@ protected long pos(Record record) { return (Long) posAccessor().get(record); } + @Override + protected Consumer deleteMarker() { + return record -> record.set(deleteMarkerIndex(), true); + } + + @Override + protected boolean isDeletedRow(Record record) { + return record.get(deleteMarkerIndex(), Boolean.class); + } + @Override protected StructLike asStructLike(Record record) { return asStructLike.wrap(record); @@ -49,4 +60,5 @@ protected StructLike asStructLike(Record record) { protected InputFile getInputFile(String location) { return io.newInputFile(location); } + } diff --git a/flink/v1.12/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java b/flink/v1.12/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java index b71f2b0fafe5..873b04890370 100644 --- a/flink/v1.12/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java +++ b/flink/v1.12/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java @@ -20,8 +20,11 @@ package org.apache.iceberg.flink.source; import java.util.Map; +import java.util.function.Consumer; import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.UpdatableRowData; import org.apache.flink.table.types.logical.RowType; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.MetadataColumns; @@ -178,6 +181,24 @@ private static class FlinkDeleteFilter extends DeleteFilter { this.inputFilesDecryptor = inputFilesDecryptor; } + @Override + protected Consumer deleteMarker() { + return record -> { + if (record instanceof GenericRowData) { + ((GenericRowData) record).setField(deleteMarkerIndex(), true); + } else if (record instanceof UpdatableRowData) { + ((UpdatableRowData) record).setField(deleteMarkerIndex(), true); + } else { + throw new UnsupportedOperationException("Can not mark row data"); + } + }; + } + + @Override + protected boolean isDeletedRow(RowData row) { + return row.getBoolean(deleteMarkerIndex()); + } + public RowType requiredRowType() { return requiredRowType; } diff --git a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java index b71f2b0fafe5..873b04890370 100644 --- a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java +++ b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java @@ -20,8 +20,11 @@ package org.apache.iceberg.flink.source; import java.util.Map; +import java.util.function.Consumer; import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.UpdatableRowData; import org.apache.flink.table.types.logical.RowType; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.MetadataColumns; @@ -178,6 +181,24 @@ private static class FlinkDeleteFilter extends DeleteFilter { this.inputFilesDecryptor = inputFilesDecryptor; } + @Override + protected Consumer deleteMarker() { + return record -> { + if (record instanceof GenericRowData) { + ((GenericRowData) record).setField(deleteMarkerIndex(), true); + } else if (record instanceof UpdatableRowData) { + ((UpdatableRowData) record).setField(deleteMarkerIndex(), true); + } else { + throw new UnsupportedOperationException("Can not mark row data"); + } + }; + } + + @Override + protected boolean isDeletedRow(RowData row) { + return row.getBoolean(deleteMarkerIndex()); + } + public RowType requiredRowType() { return requiredRowType; } diff --git a/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java b/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/source/DeleteRowReader.java similarity index 69% rename from spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java rename to spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/source/DeleteRowReader.java index ce2226f4f75e..d263eda09539 100644 --- a/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java +++ b/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/source/DeleteRowReader.java @@ -22,6 +22,7 @@ import java.util.Map; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileContent; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; @@ -29,12 +30,15 @@ import org.apache.spark.rdd.InputFileBlockHolder; import org.apache.spark.sql.catalyst.InternalRow; -public class EqualityDeleteRowReader extends RowDataReader { +public class DeleteRowReader extends RowDataReader { private final Schema expectedSchema; + private final FileContent deleteContent; - public EqualityDeleteRowReader(CombinedScanTask task, Table table, Schema expectedSchema, boolean caseSensitive) { - super(task, table, table.schema(), caseSensitive); + public DeleteRowReader(CombinedScanTask task, Table table, Schema expectedSchema, + boolean caseSensitive, FileContent deleteContent) { + super(task, table, expectedSchema, caseSensitive); this.expectedSchema = expectedSchema; + this.deleteContent = deleteContent; } @Override @@ -49,6 +53,14 @@ CloseableIterator open(FileScanTask task) { // update the current file for Spark's filename() function InputFileBlockHolder.set(file.path().toString(), task.start(), task.length()); - return matches.findEqualityDeleteRows(open(task, requiredSchema, idToConstant)).iterator(); + if (deleteContent == null) { + return matches.keepRowsFromDeletes(open(task, requiredSchema, idToConstant)).iterator(); + } + + if (deleteContent.equals(FileContent.EQUALITY_DELETES)) { + return matches.keepRowsFromEqualityDeletes(open(task, requiredSchema, idToConstant)).iterator(); + } else { + return matches.keepRowsFromPosDeletes(open(task, requiredSchema, idToConstant)).iterator(); + } } } diff --git a/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java index 4f5962494feb..0bcc340de8be 100644 --- a/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java +++ b/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java @@ -20,6 +20,7 @@ package org.apache.iceberg.spark.source; import java.util.Map; +import java.util.function.Consumer; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataTask; @@ -186,6 +187,16 @@ protected class SparkDeleteFilter extends DeleteFilter { this.asStructLike = new InternalRowWrapper(SparkSchemaUtil.convert(requiredSchema())); } + @Override + protected Consumer deleteMarker() { + return record -> record.setBoolean(deleteMarkerIndex(), true); + } + + @Override + protected boolean isDeletedRow(InternalRow row) { + return row.getBoolean(deleteMarkerIndex()); + } + @Override protected StructLike asStructLike(InternalRow row) { return asStructLike.wrap(row); diff --git a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java index e543a408e8ce..103436b7a4e8 100644 --- a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java +++ b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java @@ -26,6 +26,7 @@ import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileContent; import org.apache.iceberg.Files; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -49,6 +50,8 @@ import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.SparkStructLike; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.CharSequenceSet; +import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.StructLikeSet; import org.apache.iceberg.util.TableScanUtil; import org.apache.spark.sql.Dataset; @@ -205,7 +208,8 @@ public void testReadEqualityDeleteRows() throws IOException { TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT); for (CombinedScanTask task : tasks) { - try (EqualityDeleteRowReader reader = new EqualityDeleteRowReader(task, table, table.schema(), false)) { + try (DeleteRowReader reader = new DeleteRowReader(task, table, table.schema(), false, + FileContent.EQUALITY_DELETES)) { while (reader.next()) { actualRowSet.add(new InternalRowWrapper(SparkSchemaUtil.convert(table.schema())).wrap(reader.get().copy())); } @@ -215,4 +219,96 @@ public void testReadEqualityDeleteRows() throws IOException { Assert.assertEquals("should include 4 deleted row", 4, actualRowSet.size()); Assert.assertEquals("deleted row should be matched", expectedRowSet, actualRowSet); } + + @Test + public void testReadPositionDeleteRows() throws IOException { + List> deletes = Lists.newArrayList( + Pair.of(dataFile.path(), 0L), // id = 29 + Pair.of(dataFile.path(), 3L), // id = 89 + Pair.of(dataFile.path(), 6L) // id = 122 + ); + + Pair posDeletes = FileHelpers.writeDeleteFile( + table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), deletes); + + table.newRowDelta() + .addDeletes(posDeletes.first()) + .validateDataFilesExist(posDeletes.second()) + .commit(); + + StructLikeSet expectedRowSet = rowSetWithIds(29, 89, 122); + + Types.StructType type = table.schema().asStruct(); + StructLikeSet actualRowSet = StructLikeSet.create(type); + + CloseableIterable tasks = TableScanUtil.planTasks( + table.newScan().planFiles(), + TableProperties.METADATA_SPLIT_SIZE_DEFAULT, + TableProperties.SPLIT_LOOKBACK_DEFAULT, + TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT); + + for (CombinedScanTask task : tasks) { + try (DeleteRowReader reader = new DeleteRowReader(task, table, table.schema(), false, + FileContent.POSITION_DELETES)) { + while (reader.next()) { + actualRowSet.add(new InternalRowWrapper(SparkSchemaUtil.convert(table.schema())).wrap(reader.get().copy())); + } + } + } + + Assert.assertEquals("should include 3 deleted row", 3, actualRowSet.size()); + Assert.assertEquals("deleted row should be matched", expectedRowSet, actualRowSet); + } + + @Test + public void testReadDeleteRows() throws IOException { + Schema deleteSchema = table.schema().select("data"); + Record dataDelete = GenericRecord.create(deleteSchema); + List dataDeletes = Lists.newArrayList( + dataDelete.copy("data", "b"), // id = 43 + dataDelete.copy("data", "f"), // id = 121 + dataDelete.copy("data", "g") // id = 122 + ); + + DeleteFile eqDelete = FileHelpers.writeDeleteFile( + table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), dataDeletes, deleteSchema); + + + List> deletes = Lists.newArrayList( + Pair.of(dataFile.path(), 0L), // id = 29 + Pair.of(dataFile.path(), 3L), // id = 89 + Pair.of(dataFile.path(), 6L) // id = 122 + ); + + Pair posDeletes = FileHelpers.writeDeleteFile( + table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), deletes); + + table.newRowDelta() + .addDeletes(posDeletes.first()) + .addDeletes(eqDelete) + .validateDataFilesExist(posDeletes.second()) + .commit(); + + StructLikeSet expectedRowSet = rowSetWithIds(29, 43, 89, 121, 122); + + Types.StructType type = table.schema().asStruct(); + StructLikeSet actualRowSet = StructLikeSet.create(type); + + CloseableIterable tasks = TableScanUtil.planTasks( + table.newScan().planFiles(), + TableProperties.METADATA_SPLIT_SIZE_DEFAULT, + TableProperties.SPLIT_LOOKBACK_DEFAULT, + TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT); + + for (CombinedScanTask task : tasks) { + try (DeleteRowReader reader = new DeleteRowReader(task, table, table.schema(), false, null)) { + while (reader.next()) { + actualRowSet.add(new InternalRowWrapper(SparkSchemaUtil.convert(table.schema())).wrap(reader.get().copy())); + } + } + } + + Assert.assertEquals("should include 5 deleted row", 5, actualRowSet.size()); + Assert.assertEquals("deleted row should be matched", expectedRowSet, actualRowSet); + } } diff --git a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/DeleteRowReader.java similarity index 69% rename from spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java rename to spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/DeleteRowReader.java index ce2226f4f75e..d263eda09539 100644 --- a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java +++ b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/DeleteRowReader.java @@ -22,6 +22,7 @@ import java.util.Map; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileContent; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; @@ -29,12 +30,15 @@ import org.apache.spark.rdd.InputFileBlockHolder; import org.apache.spark.sql.catalyst.InternalRow; -public class EqualityDeleteRowReader extends RowDataReader { +public class DeleteRowReader extends RowDataReader { private final Schema expectedSchema; + private final FileContent deleteContent; - public EqualityDeleteRowReader(CombinedScanTask task, Table table, Schema expectedSchema, boolean caseSensitive) { - super(task, table, table.schema(), caseSensitive); + public DeleteRowReader(CombinedScanTask task, Table table, Schema expectedSchema, + boolean caseSensitive, FileContent deleteContent) { + super(task, table, expectedSchema, caseSensitive); this.expectedSchema = expectedSchema; + this.deleteContent = deleteContent; } @Override @@ -49,6 +53,14 @@ CloseableIterator open(FileScanTask task) { // update the current file for Spark's filename() function InputFileBlockHolder.set(file.path().toString(), task.start(), task.length()); - return matches.findEqualityDeleteRows(open(task, requiredSchema, idToConstant)).iterator(); + if (deleteContent == null) { + return matches.keepRowsFromDeletes(open(task, requiredSchema, idToConstant)).iterator(); + } + + if (deleteContent.equals(FileContent.EQUALITY_DELETES)) { + return matches.keepRowsFromEqualityDeletes(open(task, requiredSchema, idToConstant)).iterator(); + } else { + return matches.keepRowsFromPosDeletes(open(task, requiredSchema, idToConstant)).iterator(); + } } } diff --git a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java index 4f5962494feb..0bcc340de8be 100644 --- a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java +++ b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java @@ -20,6 +20,7 @@ package org.apache.iceberg.spark.source; import java.util.Map; +import java.util.function.Consumer; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataTask; @@ -186,6 +187,16 @@ protected class SparkDeleteFilter extends DeleteFilter { this.asStructLike = new InternalRowWrapper(SparkSchemaUtil.convert(requiredSchema())); } + @Override + protected Consumer deleteMarker() { + return record -> record.setBoolean(deleteMarkerIndex(), true); + } + + @Override + protected boolean isDeletedRow(InternalRow row) { + return row.getBoolean(deleteMarkerIndex()); + } + @Override protected StructLike asStructLike(InternalRow row) { return asStructLike.wrap(row); diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java index e543a408e8ce..e6abfde2a79c 100644 --- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java +++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java @@ -26,6 +26,7 @@ import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileContent; import org.apache.iceberg.Files; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -205,7 +206,8 @@ public void testReadEqualityDeleteRows() throws IOException { TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT); for (CombinedScanTask task : tasks) { - try (EqualityDeleteRowReader reader = new EqualityDeleteRowReader(task, table, table.schema(), false)) { + try (DeleteRowReader reader = new DeleteRowReader(task, table, table.schema(), false, + FileContent.EQUALITY_DELETES)) { while (reader.next()) { actualRowSet.add(new InternalRowWrapper(SparkSchemaUtil.convert(table.schema())).wrap(reader.get().copy())); } diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/DeleteRowReader.java similarity index 69% rename from spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java rename to spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/DeleteRowReader.java index ce2226f4f75e..d263eda09539 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/DeleteRowReader.java @@ -22,6 +22,7 @@ import java.util.Map; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileContent; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; @@ -29,12 +30,15 @@ import org.apache.spark.rdd.InputFileBlockHolder; import org.apache.spark.sql.catalyst.InternalRow; -public class EqualityDeleteRowReader extends RowDataReader { +public class DeleteRowReader extends RowDataReader { private final Schema expectedSchema; + private final FileContent deleteContent; - public EqualityDeleteRowReader(CombinedScanTask task, Table table, Schema expectedSchema, boolean caseSensitive) { - super(task, table, table.schema(), caseSensitive); + public DeleteRowReader(CombinedScanTask task, Table table, Schema expectedSchema, + boolean caseSensitive, FileContent deleteContent) { + super(task, table, expectedSchema, caseSensitive); this.expectedSchema = expectedSchema; + this.deleteContent = deleteContent; } @Override @@ -49,6 +53,14 @@ CloseableIterator open(FileScanTask task) { // update the current file for Spark's filename() function InputFileBlockHolder.set(file.path().toString(), task.start(), task.length()); - return matches.findEqualityDeleteRows(open(task, requiredSchema, idToConstant)).iterator(); + if (deleteContent == null) { + return matches.keepRowsFromDeletes(open(task, requiredSchema, idToConstant)).iterator(); + } + + if (deleteContent.equals(FileContent.EQUALITY_DELETES)) { + return matches.keepRowsFromEqualityDeletes(open(task, requiredSchema, idToConstant)).iterator(); + } else { + return matches.keepRowsFromPosDeletes(open(task, requiredSchema, idToConstant)).iterator(); + } } } diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java index 4f5962494feb..0bcc340de8be 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java @@ -20,6 +20,7 @@ package org.apache.iceberg.spark.source; import java.util.Map; +import java.util.function.Consumer; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataTask; @@ -186,6 +187,16 @@ protected class SparkDeleteFilter extends DeleteFilter { this.asStructLike = new InternalRowWrapper(SparkSchemaUtil.convert(requiredSchema())); } + @Override + protected Consumer deleteMarker() { + return record -> record.setBoolean(deleteMarkerIndex(), true); + } + + @Override + protected boolean isDeletedRow(InternalRow row) { + return row.getBoolean(deleteMarkerIndex()); + } + @Override protected StructLike asStructLike(InternalRow row) { return asStructLike.wrap(row); diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java index e543a408e8ce..e6abfde2a79c 100644 --- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java +++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java @@ -26,6 +26,7 @@ import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileContent; import org.apache.iceberg.Files; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -205,7 +206,8 @@ public void testReadEqualityDeleteRows() throws IOException { TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT); for (CombinedScanTask task : tasks) { - try (EqualityDeleteRowReader reader = new EqualityDeleteRowReader(task, table, table.schema(), false)) { + try (DeleteRowReader reader = new DeleteRowReader(task, table, table.schema(), false, + FileContent.EQUALITY_DELETES)) { while (reader.next()) { actualRowSet.add(new InternalRowWrapper(SparkSchemaUtil.convert(table.schema())).wrap(reader.get().copy())); } diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java index 5f643fa37d5d..4d3795ee3552 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java @@ -21,6 +21,7 @@ import java.util.Map; import java.util.Set; +import java.util.function.Consumer; import org.apache.arrow.vector.NullCheckingForGet; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.DataFile; @@ -145,6 +146,16 @@ private class SparkDeleteFilter extends DeleteFilter { this.asStructLike = new InternalRowWrapper(SparkSchemaUtil.convert(requiredSchema())); } + @Override + protected Consumer deleteMarker() { + return record -> record.setBoolean(deleteMarkerIndex(), true); + } + + @Override + protected boolean isDeletedRow(InternalRow row) { + return row.getBoolean(deleteMarkerIndex()); + } + @Override protected StructLike asStructLike(InternalRow row) { return asStructLike.wrap(row); diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/DeleteRowReader.java similarity index 69% rename from spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java rename to spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/DeleteRowReader.java index ce2226f4f75e..d263eda09539 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/DeleteRowReader.java @@ -22,6 +22,7 @@ import java.util.Map; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileContent; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; @@ -29,12 +30,15 @@ import org.apache.spark.rdd.InputFileBlockHolder; import org.apache.spark.sql.catalyst.InternalRow; -public class EqualityDeleteRowReader extends RowDataReader { +public class DeleteRowReader extends RowDataReader { private final Schema expectedSchema; + private final FileContent deleteContent; - public EqualityDeleteRowReader(CombinedScanTask task, Table table, Schema expectedSchema, boolean caseSensitive) { - super(task, table, table.schema(), caseSensitive); + public DeleteRowReader(CombinedScanTask task, Table table, Schema expectedSchema, + boolean caseSensitive, FileContent deleteContent) { + super(task, table, expectedSchema, caseSensitive); this.expectedSchema = expectedSchema; + this.deleteContent = deleteContent; } @Override @@ -49,6 +53,14 @@ CloseableIterator open(FileScanTask task) { // update the current file for Spark's filename() function InputFileBlockHolder.set(file.path().toString(), task.start(), task.length()); - return matches.findEqualityDeleteRows(open(task, requiredSchema, idToConstant)).iterator(); + if (deleteContent == null) { + return matches.keepRowsFromDeletes(open(task, requiredSchema, idToConstant)).iterator(); + } + + if (deleteContent.equals(FileContent.EQUALITY_DELETES)) { + return matches.keepRowsFromEqualityDeletes(open(task, requiredSchema, idToConstant)).iterator(); + } else { + return matches.keepRowsFromPosDeletes(open(task, requiredSchema, idToConstant)).iterator(); + } } } diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java index 4f5962494feb..0bcc340de8be 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java @@ -20,6 +20,7 @@ package org.apache.iceberg.spark.source; import java.util.Map; +import java.util.function.Consumer; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataTask; @@ -186,6 +187,16 @@ protected class SparkDeleteFilter extends DeleteFilter { this.asStructLike = new InternalRowWrapper(SparkSchemaUtil.convert(requiredSchema())); } + @Override + protected Consumer deleteMarker() { + return record -> record.setBoolean(deleteMarkerIndex(), true); + } + + @Override + protected boolean isDeletedRow(InternalRow row) { + return row.getBoolean(deleteMarkerIndex()); + } + @Override protected StructLike asStructLike(InternalRow row) { return asStructLike.wrap(row); diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java index 0b2dbf64fcfb..ddcb17a82906 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java @@ -26,6 +26,7 @@ import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileContent; import org.apache.iceberg.Files; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -228,7 +229,8 @@ public void testReadEqualityDeleteRows() throws IOException { TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT); for (CombinedScanTask task : tasks) { - try (EqualityDeleteRowReader reader = new EqualityDeleteRowReader(task, table, table.schema(), false)) { + try (DeleteRowReader reader = new DeleteRowReader(task, table, table.schema(), false, + FileContent.EQUALITY_DELETES)) { while (reader.next()) { actualRowSet.add(new InternalRowWrapper(SparkSchemaUtil.convert(table.schema())).wrap(reader.get().copy())); }