diff --git a/api/src/test/java/org/apache/iceberg/TestHelpers.java b/api/src/test/java/org/apache/iceberg/TestHelpers.java index 401266ba42b4..2b3b0a7783a9 100644 --- a/api/src/test/java/org/apache/iceberg/TestHelpers.java +++ b/api/src/test/java/org/apache/iceberg/TestHelpers.java @@ -177,7 +177,7 @@ public T get(int pos, Class javaClass) { @Override public void set(int pos, T value) { - throw new UnsupportedOperationException("Setting values is not supported"); + values[pos] = value; } @Override diff --git a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java index af5d95339383..6d6aba2bbbe3 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java +++ b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java @@ -22,7 +22,9 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.List; +import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Predicate; import org.apache.iceberg.Accessor; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; @@ -63,14 +65,18 @@ public static CloseableIterable filter(CloseableIterable rows, Functio return equalityFilter.filter(rows); } - public static CloseableIterable filter(CloseableIterable rows, Function rowToPosition, - PositionDeleteIndex deleteSet) { - if (deleteSet.isEmpty()) { - return rows; - } + public static CloseableIterable markDeleted(CloseableIterable rows, Predicate isDeleted, + Consumer deleteMarker) { + return CloseableIterable.transform(rows, row -> { + if (isDeleted.test(row)) { + deleteMarker.accept(row); + } + return row; + }); + } - PositionSetDeleteFilter filter = new PositionSetDeleteFilter<>(rowToPosition, deleteSet); - return filter.filter(rows); + public static CloseableIterable filterDeleted(CloseableIterable rows, Predicate isDeleted) { + return CloseableIterable.filter(rows, isDeleted.negate()); } public static StructLikeSet toEqualitySet(CloseableIterable eqDeletes, Types.StructType eqType) { @@ -107,6 +113,13 @@ public static CloseableIterable streamingFilter(CloseableIterable rows return new PositionStreamDeleteFilter<>(rows, rowToPosition, posDeletes); } + public static CloseableIterable streamingMarker(CloseableIterable rows, + Function rowToPosition, + CloseableIterable posDeletes, + Consumer markDeleted) { + return new PositionStreamDeleteMarker<>(rows, rowToPosition, posDeletes, markDeleted); + } + public static CloseableIterable deletePositions(CharSequence dataLocation, CloseableIterable deleteFile) { return deletePositions(dataLocation, ImmutableList.of(deleteFile)); @@ -137,93 +150,91 @@ protected boolean shouldKeep(T row) { } } - private static class PositionSetDeleteFilter extends Filter { - private final Function rowToPosition; - private final PositionDeleteIndex deleteSet; - - private PositionSetDeleteFilter(Function rowToPosition, PositionDeleteIndex deleteSet) { - this.rowToPosition = rowToPosition; - this.deleteSet = deleteSet; - } - - @Override - protected boolean shouldKeep(T row) { - return !deleteSet.isDeleted(rowToPosition.apply(row)); - } - } - - private static class PositionStreamDeleteFilter extends CloseableGroup implements CloseableIterable { + private abstract static class PositionStreamDeleteIterable extends CloseableGroup implements CloseableIterable { private final CloseableIterable rows; - private final Function extractPos; - private final CloseableIterable deletePositions; + private final CloseableIterator deletePosIterator; + private final Function rowToPosition; + private long nextDeletePos; - private PositionStreamDeleteFilter(CloseableIterable rows, Function extractPos, - CloseableIterable deletePositions) { + PositionStreamDeleteIterable(CloseableIterable rows, Function rowToPosition, + CloseableIterable deletePositions) { this.rows = rows; - this.extractPos = extractPos; - this.deletePositions = deletePositions; + this.rowToPosition = rowToPosition; + this.deletePosIterator = deletePositions.iterator(); } @Override public CloseableIterator iterator() { - CloseableIterator deletePosIterator = deletePositions.iterator(); - CloseableIterator iter; if (deletePosIterator.hasNext()) { - iter = new PositionFilterIterator(rows.iterator(), deletePosIterator); + nextDeletePos = deletePosIterator.next(); + iter = applyDelete(rows.iterator()); } else { iter = rows.iterator(); - try { - deletePosIterator.close(); - } catch (IOException e) { - throw new UncheckedIOException("Failed to close delete positions iterator", e); - } } addCloseable(iter); + addCloseable(deletePosIterator); return iter; } - private class PositionFilterIterator extends FilterIterator { - private final CloseableIterator deletePosIterator; - private long nextDeletePos; + boolean isDeleted(T row) { + long currentPos = rowToPosition.apply(row); + if (currentPos < nextDeletePos) { + return false; + } - protected PositionFilterIterator(CloseableIterator items, CloseableIterator deletePositions) { - super(items); - this.deletePosIterator = deletePositions; + // consume delete positions until the next is past the current position + boolean isDeleted = currentPos == nextDeletePos; + while (deletePosIterator.hasNext() && nextDeletePos <= currentPos) { this.nextDeletePos = deletePosIterator.next(); + if (!isDeleted && currentPos == nextDeletePos) { + // if any delete position matches the current position + isDeleted = true; + } } - @Override - protected boolean shouldKeep(T row) { - long currentPos = extractPos.apply(row); - if (currentPos < nextDeletePos) { - return true; - } + return isDeleted; + } + + protected abstract CloseableIterator applyDelete(CloseableIterator items); + } + + private static class PositionStreamDeleteFilter extends PositionStreamDeleteIterable { + private PositionStreamDeleteFilter(CloseableIterable rows, Function rowToPosition, + CloseableIterable deletePositions) { + super(rows, rowToPosition, deletePositions); + } - // consume delete positions until the next is past the current position - boolean keep = currentPos != nextDeletePos; - while (deletePosIterator.hasNext() && nextDeletePos <= currentPos) { - this.nextDeletePos = deletePosIterator.next(); - if (keep && currentPos == nextDeletePos) { - // if any delete position matches the current position, discard - keep = false; - } + @Override + protected CloseableIterator applyDelete(CloseableIterator items) { + return new FilterIterator(items) { + @Override + protected boolean shouldKeep(T item) { + return !isDeleted(item); } + }; + } + } - return keep; - } + private static class PositionStreamDeleteMarker extends PositionStreamDeleteIterable { + private final Consumer markDeleted; + + PositionStreamDeleteMarker(CloseableIterable rows, Function rowToPosition, + CloseableIterable deletePositions, Consumer markDeleted) { + super(rows, rowToPosition, deletePositions); + this.markDeleted = markDeleted; + } - @Override - public void close() { - super.close(); - try { - deletePosIterator.close(); - } catch (IOException e) { - throw new UncheckedIOException("Failed to close delete positions iterator", e); + @Override + protected CloseableIterator applyDelete(CloseableIterator items) { + return CloseableIterator.transform(items, row -> { + if (isDeleted(row)) { + markDeleted.accept(row); } - } + return row; + }); } } diff --git a/core/src/test/java/org/apache/iceberg/deletes/TestPositionFilter.java b/core/src/test/java/org/apache/iceberg/deletes/TestPositionFilter.java index 65e61ac688bd..dc255c7c4ab0 100644 --- a/core/src/test/java/org/apache/iceberg/deletes/TestPositionFilter.java +++ b/core/src/test/java/org/apache/iceberg/deletes/TestPositionFilter.java @@ -20,6 +20,7 @@ package org.apache.iceberg.deletes; import java.util.List; +import java.util.function.Predicate; import org.apache.avro.util.Utf8; import org.apache.iceberg.StructLike; import org.apache.iceberg.TestHelpers.Row; @@ -117,6 +118,33 @@ public void testPositionStreamRowFilter() { Lists.newArrayList(Iterables.transform(actual, row -> row.get(0, Long.class)))); } + @Test + public void testPositionStreamRowDeleteMarker() { + CloseableIterable rows = CloseableIterable.withNoopClose(Lists.newArrayList( + Row.of(0L, "a", false), + Row.of(1L, "b", false), + Row.of(2L, "c", false), + Row.of(3L, "d", false), + Row.of(4L, "e", false), + Row.of(5L, "f", false), + Row.of(6L, "g", false), + Row.of(7L, "h", false), + Row.of(8L, "i", false), + Row.of(9L, "j", false) + )); + + CloseableIterable deletes = CloseableIterable.withNoopClose(Lists.newArrayList(0L, 3L, 4L, 7L, 9L)); + + CloseableIterable actual = Deletes.streamingMarker(rows, + row -> row.get(0, Long.class), /* row to position */ + deletes, + row -> row.set(2, true) /* delete marker */ + ); + Assert.assertEquals("Filter should produce expected rows", + Lists.newArrayList(true, false, false, true, true, false, false, true, false, true), + Lists.newArrayList(Iterables.transform(actual, row -> row.get(2, Boolean.class)))); + } + @Test public void testPositionStreamRowFilterWithDuplicates() { CloseableIterable rows = CloseableIterable.withNoopClose(Lists.newArrayList( @@ -151,11 +179,11 @@ public void testPositionStreamRowFilterWithRowGaps() { Row.of(6L, "g") )); - CloseableIterable deletes = CloseableIterable.withNoopClose(Lists.newArrayList(0L, 3L, 4L, 7L, 9L)); + CloseableIterable deletes = CloseableIterable.withNoopClose(Lists.newArrayList(0L, 2L, 3L, 4L, 7L, 9L)); CloseableIterable actual = Deletes.streamingFilter(rows, row -> row.get(0, Long.class), deletes); Assert.assertEquals("Filter should produce expected rows", - Lists.newArrayList(2L, 5L, 6L), + Lists.newArrayList(5L, 6L), Lists.newArrayList(Iterables.transform(actual, row -> row.get(0, Long.class)))); } @@ -216,9 +244,8 @@ public void testPositionSetRowFilter() { CloseableIterable deletes = CloseableIterable.withNoopClose(Lists.newArrayList(0L, 3L, 4L, 7L, 9L)); - CloseableIterable actual = Deletes.filter( - rows, row -> row.get(0, Long.class), - Deletes.toPositionIndex(deletes)); + Predicate shouldKeep = row -> !Deletes.toPositionIndex(deletes).isDeleted(row.get(0, Long.class)); + CloseableIterable actual = CloseableIterable.filter(rows, shouldKeep); Assert.assertEquals("Filter should produce expected rows", Lists.newArrayList(1L, 2L, 5L, 6L, 8L), Lists.newArrayList(Iterables.transform(actual, row -> row.get(0, Long.class)))); @@ -254,9 +281,11 @@ public void testCombinedPositionSetRowFilter() { Row.of(9L, "j") )); - CloseableIterable actual = Deletes.filter( - rows, row -> row.get(0, Long.class), - Deletes.toPositionIndex("file_a.avro", ImmutableList.of(positionDeletes1, positionDeletes2))); + Predicate isDeleted = row -> Deletes + .toPositionIndex("file_a.avro", ImmutableList.of(positionDeletes1, positionDeletes2)) + .isDeleted(row.get(0, Long.class)); + + CloseableIterable actual = CloseableIterable.filter(rows, isDeleted.negate()); Assert.assertEquals("Filter should produce expected rows", Lists.newArrayList(1L, 2L, 5L, 6L, 8L), diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java index 8d420487943b..3bb44a1e4e0f 100644 --- a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java +++ b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java @@ -51,7 +51,6 @@ import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; -import org.apache.iceberg.util.Filter; import org.apache.iceberg.util.StructLikeSet; import org.apache.iceberg.util.StructProjection; @@ -67,6 +66,8 @@ public abstract class DeleteFilter { private final List eqDeletes; private final Schema requiredSchema; private final Accessor posAccessor; + private final boolean hasIsDeletedColumn; + private final int isDeletedColumnPosition; private PositionDeleteIndex deleteRowPositions = null; private Predicate eqDeleteRows = null; @@ -94,6 +95,12 @@ protected DeleteFilter(String filePath, List deletes, Schema tableSc this.eqDeletes = eqDeleteBuilder.build(); this.requiredSchema = fileProjection(tableSchema, requestedSchema, posDeletes, eqDeletes); this.posAccessor = requiredSchema.accessorForField(MetadataColumns.ROW_POSITION.fieldId()); + this.hasIsDeletedColumn = requiredSchema.findField(MetadataColumns.IS_DELETED.fieldId()) != null; + this.isDeletedColumnPosition = requiredSchema.columns().indexOf(MetadataColumns.IS_DELETED); + } + + protected int columnIsDeletedPosition() { + return isDeletedColumnPosition; } public Schema requiredSchema() { @@ -169,30 +176,19 @@ public CloseableIterable findEqualityDeleteRows(CloseableIterable records) .reduce(Predicate::or) .orElse(t -> false); - Filter deletedRowsFilter = new Filter() { - @Override - protected boolean shouldKeep(T item) { - return deletedRows.test(item); - } - }; - return deletedRowsFilter.filter(records); + return CloseableIterable.filter(records, deletedRows); } private CloseableIterable applyEqDeletes(CloseableIterable records) { - // Predicate to test whether a row should be visible to user after applying equality deletions. - Predicate remainingRows = applyEqDeletes().stream() - .map(Predicate::negate) - .reduce(Predicate::and) - .orElse(t -> true); - - Filter remainingRowsFilter = new Filter() { - @Override - protected boolean shouldKeep(T item) { - return remainingRows.test(item); - } - }; + Predicate isEqDeleted = applyEqDeletes().stream() + .reduce(Predicate::or) + .orElse(t -> false); - return remainingRowsFilter.filter(records); + return createDeleteIterable(records, isEqDeleted); + } + + protected void markRowDeleted(T item) { + throw new UnsupportedOperationException(this.getClass().getName() + " does not implement markRowDeleted"); } public Predicate eqDeletedRowFilter() { @@ -226,10 +222,20 @@ private CloseableIterable applyPosDeletes(CloseableIterable records) { // if there are fewer deletes than a reasonable number to keep in memory, use a set if (posDeletes.stream().mapToLong(DeleteFile::recordCount).sum() < setFilterThreshold) { - return Deletes.filter(records, this::pos, Deletes.toPositionIndex(filePath, deletes)); + PositionDeleteIndex positionIndex = Deletes.toPositionIndex(filePath, deletes); + Predicate isDeleted = record -> positionIndex.isDeleted(pos(record)); + return createDeleteIterable(records, isDeleted); } - return Deletes.streamingFilter(records, this::pos, Deletes.deletePositions(filePath, deletes)); + return hasIsDeletedColumn ? + Deletes.streamingMarker(records, this::pos, Deletes.deletePositions(filePath, deletes), this::markRowDeleted) : + Deletes.streamingFilter(records, this::pos, Deletes.deletePositions(filePath, deletes)); + } + + private CloseableIterable createDeleteIterable(CloseableIterable records, Predicate isDeleted) { + return hasIsDeletedColumn ? + Deletes.markDeleted(records, isDeleted, this::markRowDeleted) : + Deletes.filterDeleted(records, isDeleted); } private CloseableIterable openPosDeletes(DeleteFile file) { @@ -290,8 +296,6 @@ private static Schema fileProjection(Schema tableSchema, Schema requestedSchema, requiredIds.addAll(eqDelete.equalityFieldIds()); } - requiredIds.add(MetadataColumns.IS_DELETED.fieldId()); - Set missingIds = Sets.newLinkedHashSet( Sets.difference(requiredIds, TypeUtil.getProjectedIds(requestedSchema))); diff --git a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceDeleteBenchmark.java b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceDeleteBenchmark.java index ff2964212347..0cdf209889b7 100644 --- a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceDeleteBenchmark.java +++ b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceDeleteBenchmark.java @@ -103,6 +103,30 @@ public void readIcebergVectorized() { }); } + @Benchmark + @Threads(1) + public void readIcebergWithIsDeletedColumn() { + Map tableProperties = Maps.newHashMap(); + tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024)); + withTableProperties(tableProperties, () -> { + String tableLocation = table().location(); + Dataset df = spark().read().format("iceberg").load(tableLocation).filter("_deleted = false"); + materialize(df); + }); + } + + @Benchmark + @Threads(1) + public void readDeletedRows() { + Map tableProperties = Maps.newHashMap(); + tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 * 1024)); + withTableProperties(tableProperties, () -> { + String tableLocation = table().location(); + Dataset df = spark().read().format("iceberg").load(tableLocation).filter("_deleted = true"); + materialize(df); + }); + } + protected abstract void appendData() throws IOException; protected void writeData(int fileNum) { diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java index 4f5962494feb..6ebceee2f62a 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java @@ -195,5 +195,10 @@ protected StructLike asStructLike(InternalRow row) { protected InputFile getInputFile(String location) { return RowDataReader.this.getInputFile(location); } + + @Override + protected void markRowDeleted(InternalRow row) { + row.setBoolean(columnIsDeletedPosition(), true); + } } } diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java index a38e20fd32f3..a7f986491d4a 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java @@ -198,7 +198,8 @@ public MetadataColumn[] metadataColumns() { new SparkMetadataColumn(MetadataColumns.SPEC_ID.name(), DataTypes.IntegerType, false), new SparkMetadataColumn(MetadataColumns.PARTITION_COLUMN_NAME, sparkPartitionType, true), new SparkMetadataColumn(MetadataColumns.FILE_PATH.name(), DataTypes.StringType, false), - new SparkMetadataColumn(MetadataColumns.ROW_POSITION.name(), DataTypes.LongType, false) + new SparkMetadataColumn(MetadataColumns.ROW_POSITION.name(), DataTypes.LongType, false), + new SparkMetadataColumn(MetadataColumns.IS_DELETED.name(), DataTypes.BooleanType, false) }; } diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java index 0b2dbf64fcfb..aa3e193ac3af 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java @@ -21,12 +21,14 @@ import java.io.IOException; import java.util.List; +import java.util.Set; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.iceberg.BaseTable; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.Files; +import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; @@ -39,6 +41,7 @@ import org.apache.iceberg.data.DeleteReadTests; import org.apache.iceberg.data.FileHelpers; import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.InternalRecordWrapper; import org.apache.iceberg.data.Record; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.hive.HiveCatalog; @@ -46,9 +49,11 @@ import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.SparkStructLike; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ArrayUtil; import org.apache.iceberg.util.CharSequenceSet; import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.StructLikeSet; @@ -57,14 +62,17 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.internal.SQLConf; +import org.jetbrains.annotations.NotNull; import org.junit.AfterClass; import org.junit.Assert; +import org.junit.Assume; import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS; +import static org.apache.iceberg.types.Types.NestedField.required; @RunWith(Parameterized.class) public class TestSparkReaderDeletes extends DeleteReadTests { @@ -79,11 +87,8 @@ public TestSparkReaderDeletes(boolean vectorized) { } @Parameterized.Parameters(name = "vectorized = {0}") - public static Object[][] parameters() { - return new Object[][] { - new Object[] {false}, - new Object[] {true} - }; + public static Object[] parameters() { + return new Object[] {false, true}; } @BeforeClass @@ -129,6 +134,10 @@ protected Table createTable(String name, Schema schema, PartitionSpec spec) { .set(TableProperties.PARQUET_VECTORIZATION_ENABLED, "true") .set(TableProperties.PARQUET_BATCH_SIZE, "4") // split 7 records to two batches to cover more code paths .commit(); + } else { + table.updateProperties() + .set(TableProperties.PARQUET_VECTORIZATION_ENABLED, "false") + .commit(); } return table; } @@ -140,12 +149,15 @@ protected void dropTable(String name) { @Override public StructLikeSet rowSet(String name, Table table, String... columns) { + return rowSet(name, table.schema().select(columns).asStruct(), columns); + } + + public StructLikeSet rowSet(String name, Types.StructType projection, String... columns) { Dataset df = spark.read() .format("iceberg") .load(TableIdentifier.of("default", name).toString()) .selectExpr(columns); - Types.StructType projection = table.schema().select(columns).asStruct(); StructLikeSet set = StructLikeSet.create(projection); df.collectAsList().forEach(row -> { SparkStructLike rowWrapper = new SparkStructLike(projection); @@ -262,4 +274,208 @@ public void testPosDeletesAllRowsInBatch() throws IOException { Assert.assertEquals("Table should contain expected rows", expected, actual); } + + @Test + public void testPosDeletesWithDeletedColumn() throws IOException { + Assume.assumeFalse(vectorized); + + // read.parquet.vectorization.batch-size is set to 4, so the 4 rows in the first batch are all deleted. + List> deletes = Lists.newArrayList( + Pair.of(dataFile.path(), 0L), // id = 29 + Pair.of(dataFile.path(), 1L), // id = 43 + Pair.of(dataFile.path(), 2L), // id = 61 + Pair.of(dataFile.path(), 3L) // id = 89 + ); + + Pair posDeletes = FileHelpers.writeDeleteFile( + table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), deletes); + + table.newRowDelta() + .addDeletes(posDeletes.first()) + .validateDataFilesExist(posDeletes.second()) + .commit(); + + StructLikeSet expected = expectedRowSet(29, 43, 61, 89); + StructLikeSet actual = rowSet(tableName, PROJECTION_SCHEMA.asStruct(), "id", "data", "_deleted"); + + Assert.assertEquals("Table should contain expected row", expected, actual); + } + + @Test + public void testEqualityDeleteWithDeletedColumn() throws IOException { + Assume.assumeFalse(vectorized); + + String tableName = table.name().substring(table.name().lastIndexOf(".") + 1); + Schema deleteRowSchema = table.schema().select("data"); + Record dataDelete = GenericRecord.create(deleteRowSchema); + List dataDeletes = Lists.newArrayList( + dataDelete.copy("data", "a"), // id = 29 + dataDelete.copy("data", "d"), // id = 89 + dataDelete.copy("data", "g") // id = 122 + ); + + DeleteFile eqDeletes = FileHelpers.writeDeleteFile( + table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), dataDeletes, deleteRowSchema); + + table.newRowDelta() + .addDeletes(eqDeletes) + .commit(); + + StructLikeSet expected = expectedRowSet(29, 89, 122); + StructLikeSet actual = rowSet(tableName, PROJECTION_SCHEMA.asStruct(), "id", "data", "_deleted"); + + Assert.assertEquals("Table should contain expected row", expected, actual); + } + + @Test + public void testMixedPosAndEqDeletesWithDeletedColumn() throws IOException { + Assume.assumeFalse(vectorized); + + Schema dataSchema = table.schema().select("data"); + Record dataDelete = GenericRecord.create(dataSchema); + List dataDeletes = Lists.newArrayList( + dataDelete.copy("data", "a"), // id = 29 + dataDelete.copy("data", "d"), // id = 89 + dataDelete.copy("data", "g") // id = 122 + ); + + DeleteFile eqDeletes = FileHelpers.writeDeleteFile( + table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), dataDeletes, dataSchema); + + List> deletes = Lists.newArrayList( + Pair.of(dataFile.path(), 3L), // id = 89 + Pair.of(dataFile.path(), 5L) // id = 121 + ); + + Pair posDeletes = FileHelpers.writeDeleteFile( + table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), deletes); + + table.newRowDelta() + .addDeletes(eqDeletes) + .addDeletes(posDeletes.first()) + .validateDataFilesExist(posDeletes.second()) + .commit(); + + StructLikeSet expected = expectedRowSet(29, 89, 121, 122); + StructLikeSet actual = rowSet(tableName, PROJECTION_SCHEMA.asStruct(), "id", "data", "_deleted"); + + Assert.assertEquals("Table should contain expected row", expected, actual); + } + + @Test + public void testFilterOnDeletedMetadataColumn() throws IOException { + Assume.assumeFalse(vectorized); + + List> deletes = Lists.newArrayList( + Pair.of(dataFile.path(), 0L), // id = 29 + Pair.of(dataFile.path(), 1L), // id = 43 + Pair.of(dataFile.path(), 2L), // id = 61 + Pair.of(dataFile.path(), 3L) // id = 89 + ); + + Pair posDeletes = FileHelpers.writeDeleteFile( + table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), deletes); + + table.newRowDelta() + .addDeletes(posDeletes.first()) + .validateDataFilesExist(posDeletes.second()) + .commit(); + + StructLikeSet expected = expectedRowSetWithNonDeletesOnly(29, 43, 61, 89); + + // get non-deleted rows + Dataset df = spark.read() + .format("iceberg") + .load(TableIdentifier.of("default", tableName).toString()) + .select("id", "data", "_deleted") + .filter("_deleted = false"); + + Types.StructType projection = PROJECTION_SCHEMA.asStruct(); + StructLikeSet actual = StructLikeSet.create(projection); + df.collectAsList().forEach(row -> { + SparkStructLike rowWrapper = new SparkStructLike(projection); + actual.add(rowWrapper.wrap(row)); + }); + + Assert.assertEquals("Table should contain expected row", expected, actual); + + StructLikeSet expectedDeleted = expectedRowSetWithDeletesOnly(29, 43, 61, 89); + + // get deleted rows + df = spark.read() + .format("iceberg") + .load(TableIdentifier.of("default", tableName).toString()) + .select("id", "data", "_deleted") + .filter("_deleted = true"); + + StructLikeSet actualDeleted = StructLikeSet.create(projection); + df.collectAsList().forEach(row -> { + SparkStructLike rowWrapper = new SparkStructLike(projection); + actualDeleted.add(rowWrapper.wrap(row)); + }); + + Assert.assertEquals("Table should contain expected row", expectedDeleted, actualDeleted); + } + + @Test + public void testIsDeletedColumnWithoutDeleteFile() { + Assume.assumeFalse(vectorized); + + StructLikeSet expected = expectedRowSet(); + StructLikeSet actual = rowSet(tableName, PROJECTION_SCHEMA.asStruct(), "id", "data", "_deleted"); + Assert.assertEquals("Table should contain expected row", expected, actual); + } + + private static final Schema PROJECTION_SCHEMA = new Schema( + required(1, "id", Types.IntegerType.get()), + required(2, "data", Types.StringType.get()), + MetadataColumns.IS_DELETED + ); + + private static StructLikeSet expectedRowSet(int... idsToRemove) { + return expectedRowSet(false, false, idsToRemove); + } + + private static StructLikeSet expectedRowSetWithDeletesOnly(int... idsToRemove) { + return expectedRowSet(false, true, idsToRemove); + } + + private static StructLikeSet expectedRowSetWithNonDeletesOnly(int... idsToRemove) { + return expectedRowSet(true, false, idsToRemove); + } + + private static StructLikeSet expectedRowSet(boolean removeDeleted, boolean removeNonDeleted, int... idsToRemove) { + Set deletedIds = Sets.newHashSet(ArrayUtil.toIntList(idsToRemove)); + List records = recordsWithDeletedColumn(); + // mark rows deleted + records.forEach(record -> { + if (deletedIds.contains(record.getField("id"))) { + record.setField(MetadataColumns.IS_DELETED.name(), true); + } + }); + + records.removeIf(record -> deletedIds.contains(record.getField("id")) && removeDeleted); + records.removeIf(record -> !deletedIds.contains(record.getField("id")) && removeNonDeleted); + + StructLikeSet set = StructLikeSet.create(PROJECTION_SCHEMA.asStruct()); + records.forEach(record -> set.add(new InternalRecordWrapper(PROJECTION_SCHEMA.asStruct()).wrap(record))); + + return set; + } + + @NotNull + private static List recordsWithDeletedColumn() { + List records = Lists.newArrayList(); + + // records all use IDs that are in bucket id_bucket=0 + GenericRecord record = GenericRecord.create(PROJECTION_SCHEMA); + records.add(record.copy("id", 29, "data", "a", "_deleted", false)); + records.add(record.copy("id", 43, "data", "b", "_deleted", false)); + records.add(record.copy("id", 61, "data", "c", "_deleted", false)); + records.add(record.copy("id", 89, "data", "d", "_deleted", false)); + records.add(record.copy("id", 100, "data", "e", "_deleted", false)); + records.add(record.copy("id", 121, "data", "f", "_deleted", false)); + records.add(record.copy("id", 122, "data", "g", "_deleted", false)); + return records; + } }