diff --git a/core/src/main/java/org/apache/iceberg/deletes/DeleteCounter.java b/core/src/main/java/org/apache/iceberg/deletes/DeleteCounter.java new file mode 100644 index 000000000000..f6daa9b4d6e3 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/deletes/DeleteCounter.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.deletes; + +/** A counter to be used to count deletes as they are applied. */ +public class DeleteCounter { + + private long count = 0L; + + /** Increment the counter by one. */ + public void increment() { + count++; + } + + /** Return the current value of the counter. */ + public long get() { + return count; + } +} diff --git a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java index bd4e03916a01..beec06e045d3 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java +++ b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java @@ -62,6 +62,14 @@ public static CloseableIterable filter( return equalityFilter.filter(rows); } + /** + * Returns the same rows that are input, while marking the deleted ones. + * + * @param rows the rows to process + * @param isDeleted a predicate that determines if a row is deleted + * @param deleteMarker a function that marks a row as deleted + * @return the processed rows + */ public static CloseableIterable markDeleted( CloseableIterable rows, Predicate isDeleted, Consumer deleteMarker) { return CloseableIterable.transform( @@ -70,13 +78,35 @@ public static CloseableIterable markDeleted( if (isDeleted.test(row)) { deleteMarker.accept(row); } + return row; }); } + /** + * Returns the remaining rows (the ones that are not deleted), while counting the deleted ones. + * + * @param rows the rows to process + * @param isDeleted a predicate that determines if a row is deleted + * @param counter a counter that counts deleted rows + * @return the processed rows + */ public static CloseableIterable filterDeleted( - CloseableIterable rows, Predicate isDeleted) { - return CloseableIterable.filter(rows, isDeleted.negate()); + CloseableIterable rows, Predicate isDeleted, DeleteCounter counter) { + Filter remainingRowsFilter = + new Filter() { + @Override + protected boolean shouldKeep(T item) { + boolean deleted = isDeleted.test(item); + if (deleted) { + counter.increment(); + } + + return !deleted; + } + }; + + return remainingRowsFilter.filter(rows); } public static StructLikeSet toEqualitySet( @@ -116,7 +146,15 @@ public static CloseableIterable streamingFilter( CloseableIterable rows, Function rowToPosition, CloseableIterable posDeletes) { - return new PositionStreamDeleteFilter<>(rows, rowToPosition, posDeletes); + return streamingFilter(rows, rowToPosition, posDeletes, new DeleteCounter()); + } + + public static CloseableIterable streamingFilter( + CloseableIterable rows, + Function rowToPosition, + CloseableIterable posDeletes, + DeleteCounter counter) { + return new PositionStreamDeleteFilter<>(rows, rowToPosition, posDeletes, counter); } public static CloseableIterable streamingMarker( @@ -215,11 +253,15 @@ boolean isDeleted(T row) { } private static class PositionStreamDeleteFilter extends PositionStreamDeleteIterable { - private PositionStreamDeleteFilter( + private final DeleteCounter counter; + + PositionStreamDeleteFilter( CloseableIterable rows, Function rowToPosition, - CloseableIterable deletePositions) { + CloseableIterable deletePositions, + DeleteCounter counter) { super(rows, rowToPosition, deletePositions); + this.counter = counter; } @Override @@ -227,7 +269,12 @@ protected CloseableIterator applyDelete(CloseableIterator items) { return new FilterIterator(items) { @Override protected boolean shouldKeep(T item) { - return !isDeleted(item); + boolean deleted = isDeleted(item); + if (deleted) { + counter.increment(); + } + + return !deleted; } }; } diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java index f56748aeca04..a7979fd2ed3e 100644 --- a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java +++ b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java @@ -33,6 +33,7 @@ import org.apache.iceberg.data.avro.DataReader; import org.apache.iceberg.data.orc.GenericOrcReader; import org.apache.iceberg.data.parquet.GenericParquetReaders; +import org.apache.iceberg.deletes.DeleteCounter; import org.apache.iceberg.deletes.Deletes; import org.apache.iceberg.deletes.PositionDeleteIndex; import org.apache.iceberg.expressions.Expressions; @@ -52,8 +53,11 @@ import org.apache.iceberg.types.Types; import org.apache.iceberg.util.StructLikeSet; import org.apache.iceberg.util.StructProjection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public abstract class DeleteFilter { + private static final Logger LOG = LoggerFactory.getLogger(DeleteFilter.class); private static final long DEFAULT_SET_FILTER_THRESHOLD = 100_000L; private static final Schema POS_DELETE_SCHEMA = new Schema(MetadataColumns.DELETE_FILE_PATH, MetadataColumns.DELETE_FILE_POS); @@ -66,24 +70,32 @@ public abstract class DeleteFilter { private final Accessor posAccessor; private final boolean hasIsDeletedColumn; private final int isDeletedColumnPosition; + private final DeleteCounter counter; private PositionDeleteIndex deleteRowPositions = null; private List> isInDeleteSets = null; private Predicate eqDeleteRows = null; protected DeleteFilter( - String filePath, List deletes, Schema tableSchema, Schema requestedSchema) { + String filePath, + List deletes, + Schema tableSchema, + Schema requestedSchema, + DeleteCounter counter) { this.setFilterThreshold = DEFAULT_SET_FILTER_THRESHOLD; this.filePath = filePath; + this.counter = counter; ImmutableList.Builder posDeleteBuilder = ImmutableList.builder(); ImmutableList.Builder eqDeleteBuilder = ImmutableList.builder(); for (DeleteFile delete : deletes) { switch (delete.content()) { case POSITION_DELETES: + LOG.debug("Adding position delete file {} to filter", delete.path()); posDeleteBuilder.add(delete); break; case EQUALITY_DELETES: + LOG.debug("Adding equality delete file {} to filter", delete.path()); eqDeleteBuilder.add(delete); break; default: @@ -101,6 +113,11 @@ protected DeleteFilter( this.isDeletedColumnPosition = requiredSchema.columns().indexOf(MetadataColumns.IS_DELETED); } + protected DeleteFilter( + String filePath, List deletes, Schema tableSchema, Schema requestedSchema) { + this(filePath, deletes, tableSchema, requestedSchema, new DeleteCounter()); + } + protected int columnIsDeletedPosition() { return isDeletedColumnPosition; } @@ -117,6 +134,10 @@ public boolean hasEqDeletes() { return !eqDeletes.isEmpty(); } + public void incrementDeleteCount() { + counter.increment(); + } + Accessor posAccessor() { return posAccessor; } @@ -234,14 +255,15 @@ private CloseableIterable applyPosDeletes(CloseableIterable records) { return hasIsDeletedColumn ? Deletes.streamingMarker( records, this::pos, Deletes.deletePositions(filePath, deletes), this::markRowDeleted) - : Deletes.streamingFilter(records, this::pos, Deletes.deletePositions(filePath, deletes)); + : Deletes.streamingFilter( + records, this::pos, Deletes.deletePositions(filePath, deletes), counter); } private CloseableIterable createDeleteIterable( CloseableIterable records, Predicate isDeleted) { return hasIsDeletedColumn ? Deletes.markDeleted(records, isDeleted, this::markRowDeleted) - : Deletes.filterDeleted(records, isDeleted); + : Deletes.filterDeleted(records, isDeleted, counter); } private CloseableIterable openPosDeletes(DeleteFile file) { @@ -249,6 +271,7 @@ private CloseableIterable openPosDeletes(DeleteFile file) { } private CloseableIterable openDeletes(DeleteFile deleteFile, Schema deleteSchema) { + LOG.trace("Opening delete file {}", deleteFile.path()); InputFile input = getInputFile(deleteFile.path().toString()); switch (deleteFile.format()) { case AVRO: diff --git a/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java b/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java index 659ac895ff8e..8d8c5e531984 100644 --- a/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java +++ b/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java @@ -171,6 +171,26 @@ protected boolean expectPruned() { return true; } + protected boolean countDeletes() { + return false; + } + + /** + * This will only be called after calling rowSet(String, Table, String...), and only if + * countDeletes() is true. + */ + protected long deleteCount() { + return 0L; + } + + protected void checkDeleteCount(long expectedDeletes) { + if (countDeletes()) { + long actualDeletes = deleteCount(); + Assert.assertEquals( + "Table should contain expected number of deletes", expectedDeletes, actualDeletes); + } + } + @Test public void testEqualityDeletes() throws IOException { Schema deleteRowSchema = table.schema().select("data"); @@ -192,6 +212,7 @@ public void testEqualityDeletes() throws IOException { StructLikeSet actual = rowSet(tableName, table, "*"); Assert.assertEquals("Table should contain expected rows", expected, actual); + checkDeleteCount(3L); } @Test @@ -240,6 +261,7 @@ public void testEqualityDateDeletes() throws IOException { StructLikeSet actual = rowSet(dateTableName, dateTable, "*"); Assert.assertEquals("Table should contain expected rows", expected, actual); + checkDeleteCount(3L); } @Test @@ -270,6 +292,8 @@ public void testEqualityDeletesWithRequiredEqColumn() throws IOException { Assert.assertEquals( "Table should contain expected rows", expected, selectColumns(actual, "id")); } + + checkDeleteCount(3L); } @Test @@ -281,6 +305,8 @@ public void testEqualityDeletesSpanningMultipleDataFiles() throws IOException { this.dataFile = FileHelpers.writeDataFile(table, Files.localOutput(temp.newFile()), Row.of(0), records); + // At this point, the table has two data files, with 7 and 8 rows respectively, of which all but + // one are in duplicate. table.newAppend().appendFile(dataFile).commit(); Schema deleteRowSchema = table.schema().select("data"); @@ -296,12 +322,14 @@ public void testEqualityDeletesSpanningMultipleDataFiles() throws IOException { FileHelpers.writeDeleteFile( table, Files.localOutput(temp.newFile()), Row.of(0), dataDeletes, deleteRowSchema); + // At this point, 3 rows in the first data file and 4 rows in the second data file are deleted. table.newRowDelta().addDeletes(eqDeletes).commit(); StructLikeSet expected = rowSetWithoutIds(table, records, 29, 89, 122, 144); StructLikeSet actual = rowSet(tableName, table, "*"); Assert.assertEquals("Table should contain expected rows", expected, actual); + checkDeleteCount(7L); } @Test @@ -326,6 +354,7 @@ public void testPositionDeletes() throws IOException { StructLikeSet actual = rowSet(tableName, table, "*"); Assert.assertEquals("Table should contain expected rows", expected, actual); + checkDeleteCount(3L); } @Test @@ -363,6 +392,7 @@ public void testMultiplePosDeleteFiles() throws IOException { StructLikeSet actual = rowSet(tableName, table, "*"); Assert.assertEquals("Table should contain expected rows", expected, actual); + checkDeleteCount(3L); } @Test @@ -400,6 +430,7 @@ public void testMixedPositionAndEqualityDeletes() throws IOException { StructLikeSet actual = rowSet(tableName, table, "*"); Assert.assertEquals("Table should contain expected rows", expected, actual); + checkDeleteCount(4L); } @Test @@ -435,6 +466,7 @@ public void testMultipleEqualityDeleteSchemas() throws IOException { StructLikeSet actual = rowSet(tableName, table, "*"); Assert.assertEquals("Table should contain expected rows", expected, actual); + checkDeleteCount(4L); } @Test @@ -471,6 +503,7 @@ public void testEqualityDeleteByNull() throws IOException { StructLikeSet actual = rowSet(tableName, table, "*"); Assert.assertEquals("Table should contain expected rows", expected, actual); + checkDeleteCount(1L); } private StructLikeSet selectColumns(StructLikeSet rows, String... columns) { diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java index 9686b63d1858..f07d8c545e35 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java @@ -181,8 +181,12 @@ Pair buildPosDelRowIdMapping(PositionDeleteIndex deletedRowPosit if (!deletedRowPositions.isDeleted(originalRowId + rowStartPosInBatch)) { posDelRowIdMapping[currentRowId] = originalRowId; currentRowId++; - } else if (hasIsDeletedColumn) { - isDeleted[originalRowId] = true; + } else { + if (hasIsDeletedColumn) { + isDeleted[originalRowId] = true; + } + + deletes.incrementDeleteCount(); } originalRowId++; } @@ -203,6 +207,7 @@ int[] initEqDeleteRowIdMapping() { eqDeleteRowIdMapping[i] = i; } } + return eqDeleteRowIdMapping; } @@ -227,8 +232,12 @@ void applyEqDelete(ColumnarBatch columnarBatch) { // skip deleted rows by pointing to the next undeleted row Id rowIdMapping[currentRowId] = rowIdMapping[rowId]; currentRowId++; - } else if (hasIsDeletedColumn) { - isDeleted[rowIdMapping[rowId]] = true; + } else { + if (hasIsDeletedColumn) { + isDeleted[rowIdMapping[rowId]] = true; + } + + deletes.incrementDeleteCount(); } rowId++; diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java index 95bbaaca7cbd..2333cd734bbe 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java @@ -41,6 +41,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.data.DeleteFilter; +import org.apache.iceberg.deletes.DeleteCounter; import org.apache.iceberg.encryption.EncryptedFiles; import org.apache.iceberg.encryption.EncryptedInputFile; import org.apache.iceberg.io.CloseableIterator; @@ -77,6 +78,7 @@ abstract class BaseReader implements Closeable { private final NameMapping nameMapping; private final ScanTaskGroup taskGroup; private final Iterator tasks; + private final DeleteCounter counter; private Map lazyInputFiles; private CloseableIterator currentIterator; @@ -94,6 +96,7 @@ abstract class BaseReader implements Closeable { String nameMappingString = table.properties().get(TableProperties.DEFAULT_NAME_MAPPING); this.nameMapping = nameMappingString != null ? NameMappingParser.fromJson(nameMappingString) : null; + this.counter = new DeleteCounter(); } protected abstract CloseableIterator open(TaskT task); @@ -116,6 +119,10 @@ protected Table table() { return table; } + protected DeleteCounter counter() { + return counter; + } + public boolean next() throws IOException { try { while (true) { @@ -244,8 +251,8 @@ protected static Object convertConstant(Type type, Object value) { protected class SparkDeleteFilter extends DeleteFilter { private final InternalRowWrapper asStructLike; - SparkDeleteFilter(String filePath, List deletes) { - super(filePath, deletes, table.schema(), expectedSchema); + SparkDeleteFilter(String filePath, List deletes, DeleteCounter counter) { + super(filePath, deletes, table.schema(), expectedSchema, counter); this.asStructLike = new InternalRowWrapper(SparkSchemaUtil.convert(requiredSchema())); } @@ -261,7 +268,10 @@ protected InputFile getInputFile(String location) { @Override protected void markRowDeleted(InternalRow row) { - row.setBoolean(columnIsDeletedPosition(), true); + if (!row.getBoolean(columnIsDeletedPosition())) { + row.setBoolean(columnIsDeletedPosition(), true); + counter().increment(); + } } } } diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java index 13755f0abc79..5be913ff4682 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java @@ -30,8 +30,12 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.spark.rdd.InputFileBlockHolder; import org.apache.spark.sql.vectorized.ColumnarBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; class BatchDataReader extends BaseBatchReader { + private static final Logger LOG = LoggerFactory.getLogger(BatchDataReader.class); + BatchDataReader( ScanTaskGroup task, Table table, @@ -49,6 +53,7 @@ protected Stream> referencedFiles(FileScanTask task) { @Override protected CloseableIterator open(FileScanTask task) { String filePath = task.file().path().toString(); + LOG.debug("Opening data file {}", filePath); // update the current file for Spark's filename() function InputFileBlockHolder.set(filePath, task.start(), task.length()); @@ -59,7 +64,9 @@ protected CloseableIterator open(FileScanTask task) { Preconditions.checkNotNull(inputFile, "Could not find InputFile associated with FileScanTask"); SparkDeleteFilter deleteFilter = - task.deletes().isEmpty() ? null : new SparkDeleteFilter(filePath, task.deletes()); + task.deletes().isEmpty() + ? null + : new SparkDeleteFilter(filePath, task.deletes(), counter()); return newBatchIterable( inputFile, diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java index 82ab8f360ef0..20f0893bcca3 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java @@ -91,13 +91,13 @@ private CloseableIterable openChangelogScanTask(ChangelogScanTask t CloseableIterable openAddedRowsScanTask(AddedRowsScanTask task) { String filePath = task.file().path().toString(); - SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, task.deletes()); + SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, task.deletes(), counter()); return deletes.filter(rows(task, deletes.requiredSchema())); } private CloseableIterable openDeletedDataFileScanTask(DeletedDataFileScanTask task) { String filePath = task.file().path().toString(); - SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, task.existingDeletes()); + SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, task.existingDeletes(), counter()); return deletes.filter(rows(task, deletes.requiredSchema())); } diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java index 5d61747e3dec..9689fd0e030d 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java @@ -37,7 +37,7 @@ public EqualityDeleteRowReader( @Override protected CloseableIterator open(FileScanTask task) { SparkDeleteFilter matches = - new SparkDeleteFilter(task.file().path().toString(), task.deletes()); + new SparkDeleteFilter(task.file().path().toString(), task.deletes(), counter()); // schema or rows returned by readers Schema requiredSchema = matches.requiredSchema(); diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java index 3778049cc71a..dfa7e45761e1 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java @@ -32,8 +32,12 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.spark.rdd.InputFileBlockHolder; import org.apache.spark.sql.catalyst.InternalRow; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; class RowDataReader extends BaseRowReader { + private static final Logger LOG = LoggerFactory.getLogger(RowDataReader.class); + RowDataReader( ScanTaskGroup task, Table table, Schema expectedSchema, boolean caseSensitive) { super(table, task, expectedSchema, caseSensitive); @@ -47,7 +51,8 @@ protected Stream> referencedFiles(FileScanTask task) { @Override protected CloseableIterator open(FileScanTask task) { String filePath = task.file().path().toString(); - SparkDeleteFilter deleteFilter = new SparkDeleteFilter(filePath, task.deletes()); + LOG.debug("Opening data file {}", filePath); + SparkDeleteFilter deleteFilter = new SparkDeleteFilter(filePath, task.deletes(), counter()); // schema or rows returned by readers Schema requiredSchema = deleteFilter.requiredSchema(); diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java index f26daa55b2b3..b17c26d6bbbe 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java @@ -38,10 +38,16 @@ import org.apache.iceberg.spark.SparkReadConf; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.SparkUtil; +import org.apache.iceberg.spark.source.metrics.NumDeletes; +import org.apache.iceberg.spark.source.metrics.NumSplits; +import org.apache.iceberg.spark.source.metrics.TaskNumDeletes; +import org.apache.iceberg.spark.source.metrics.TaskNumSplits; import org.apache.iceberg.util.PropertyUtil; import org.apache.spark.broadcast.Broadcast; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.metric.CustomMetric; +import org.apache.spark.sql.connector.metric.CustomTaskMetric; import org.apache.spark.sql.connector.read.Batch; import org.apache.spark.sql.connector.read.InputPartition; import org.apache.spark.sql.connector.read.PartitionReader; @@ -166,6 +172,11 @@ public String description() { return String.format("%s [filters=%s]", table, filters); } + @Override + public CustomMetric[] supportedCustomMetrics() { + return new CustomMetric[] {new NumSplits(), new NumDeletes()}; + } + static class ReaderFactory implements PartitionReaderFactory { private final int batchSize; @@ -198,15 +209,41 @@ public boolean supportColumnarReads(InputPartition partition) { } private static class RowReader extends RowDataReader implements PartitionReader { + private long numSplits; + RowReader(ReadTask task) { super(task.task, task.table(), task.expectedSchema(), task.isCaseSensitive()); + numSplits = task.task.files().size(); + LOG.debug( + "Reading {} file split(s) for table {} using RowReader", numSplits, task.table().name()); + } + + @Override + public CustomTaskMetric[] currentMetricsValues() { + return new CustomTaskMetric[] { + new TaskNumSplits(numSplits), new TaskNumDeletes(counter().get()) + }; } } private static class BatchReader extends BatchDataReader implements PartitionReader { + private long numSplits; + BatchReader(ReadTask task, int batchSize) { super(task.task, task.table(), task.expectedSchema(), task.isCaseSensitive(), batchSize); + numSplits = task.task.files().size(); + LOG.debug( + "Reading {} file split(s) for table {} using BatchReader", + numSplits, + task.table().name()); + } + + @Override + public CustomTaskMetric[] currentMetricsValues() { + return new CustomTaskMetric[] { + new TaskNumSplits(numSplits), new TaskNumDeletes(counter().get()) + }; } } diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/metrics/NumDeletes.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/metrics/NumDeletes.java new file mode 100644 index 000000000000..5654ae3ed514 --- /dev/null +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/metrics/NumDeletes.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import java.text.NumberFormat; +import org.apache.spark.sql.connector.metric.CustomMetric; + +public class NumDeletes implements CustomMetric { + + public static final String DISPLAY_STRING = "number of row deletes applied"; + + @Override + public String name() { + return "numDeletes"; + } + + @Override + public String description() { + return DISPLAY_STRING; + } + + @Override + public String aggregateTaskMetrics(long[] taskMetrics) { + long sum = initialValue; + for (int i = 0; i < taskMetrics.length; i++) { + sum += taskMetrics[i]; + } + + return NumberFormat.getIntegerInstance().format(sum); + } +} diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/metrics/NumSplits.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/metrics/NumSplits.java new file mode 100644 index 000000000000..b4f0fc7a4462 --- /dev/null +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/metrics/NumSplits.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import java.text.NumberFormat; +import org.apache.spark.sql.connector.metric.CustomMetric; + +public class NumSplits implements CustomMetric { + + @Override + public String name() { + return "numSplits"; + } + + @Override + public String description() { + return "number of file splits read"; + } + + @Override + public String aggregateTaskMetrics(long[] taskMetrics) { + long sum = initialValue; + for (int i = 0; i < taskMetrics.length; i++) { + sum += taskMetrics[i]; + } + + return NumberFormat.getIntegerInstance().format(sum); + } +} diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskNumDeletes.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskNumDeletes.java new file mode 100644 index 000000000000..8c734ba9f022 --- /dev/null +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskNumDeletes.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomTaskMetric; + +public class TaskNumDeletes implements CustomTaskMetric { + private final long value; + + public TaskNumDeletes(long value) { + this.value = value; + } + + @Override + public String name() { + return "numDeletes"; + } + + @Override + public long value() { + return value; + } +} diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskNumSplits.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskNumSplits.java new file mode 100644 index 000000000000..d8cbc4db05bb --- /dev/null +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskNumSplits.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomTaskMetric; + +public class TaskNumSplits implements CustomTaskMetric { + private final long value; + + public TaskNumSplits(long value) { + this.value = value; + } + + @Override + public String name() { + return "numSplits"; + } + + @Override + public long value() { + return value; + } +} diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/SparkSQLExecutionHelper.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/SparkSQLExecutionHelper.java new file mode 100644 index 000000000000..3b350bc91e72 --- /dev/null +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/SparkSQLExecutionHelper.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.execution.ui.SQLAppStatusStore; +import org.apache.spark.sql.execution.ui.SQLExecutionUIData; +import org.apache.spark.sql.execution.ui.SQLPlanMetric; +import org.junit.Assert; +import scala.Option; + +public class SparkSQLExecutionHelper { + + private SparkSQLExecutionHelper() {} + + /** + * Finds the value of a specified metric for the last SQL query that was executed. Metric values + * are stored in the `SQLAppStatusStore` as strings. + * + * @param spark SparkSession used to run the SQL query + * @param metricName name of the metric + * @return value of the metric + */ + public static String lastExecutedMetricValue(SparkSession spark, String metricName) { + SQLAppStatusStore statusStore = spark.sharedState().statusStore(); + SQLExecutionUIData lastExecution = statusStore.executionsList().last(); + Option sqlPlanMetric = + lastExecution.metrics().find(metric -> metric.name().equals(metricName)); + Assert.assertTrue( + String.format("Metric '%s' not found in last execution", metricName), + sqlPlanMetric.isDefined()); + long metricId = sqlPlanMetric.get().accumulatorId(); + + // Refresh metricValues, they will remain null until the execution is complete and metrics are + // aggregated + int attempts = 3; + while (lastExecution.metricValues() == null && attempts > 0) { + try { + Thread.sleep(100); + attempts--; + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + lastExecution = statusStore.execution(lastExecution.executionId()).get(); + } + + Assert.assertNotNull("Metric values were not finalized", lastExecution.metricValues()); + String metricValue = lastExecution.metricValues().get(metricId).getOrElse(null); + Assert.assertNotNull(String.format("Metric '%s' was not finalized", metricName), metricValue); + return metricValue; + } +} diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java index 31ec21b3b0fe..4924f07bf198 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java @@ -19,6 +19,7 @@ package org.apache.iceberg.spark.source; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS; +import static org.apache.iceberg.spark.source.SparkSQLExecutionHelper.lastExecutedMetricValue; import static org.apache.iceberg.types.Types.NestedField.required; import java.io.IOException; @@ -54,6 +55,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.SparkStructLike; +import org.apache.iceberg.spark.source.metrics.NumDeletes; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.ArrayUtil; import org.apache.iceberg.util.CharSequenceSet; @@ -78,15 +80,22 @@ public class TestSparkReaderDeletes extends DeleteReadTests { private static TestHiveMetastore metastore = null; protected static SparkSession spark = null; protected static HiveCatalog catalog = null; + private final String format; private final boolean vectorized; - public TestSparkReaderDeletes(boolean vectorized) { + public TestSparkReaderDeletes(String format, boolean vectorized) { + this.format = format; this.vectorized = vectorized; } - @Parameterized.Parameters(name = "vectorized = {0}") - public static Object[] parameters() { - return new Object[] {false, true}; + @Parameterized.Parameters(name = "format = {0}, vectorized = {1}") + public static Object[][] parameters() { + return new Object[][] { + new Object[] {"parquet", false}, + new Object[] {"parquet", true}, + new Object[] {"orc", false}, + new Object[] {"avro", false} + }; } @BeforeClass @@ -98,6 +107,7 @@ public static void startMetastoreAndSpark() { spark = SparkSession.builder() .master("local[2]") + .config("spark.appStateStore.asyncTracking.enable", false) .config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic") .config("spark.hadoop." + METASTOREURIS.varname, hiveConf.get(METASTOREURIS.varname)) .enableHiveSupport() @@ -130,16 +140,21 @@ protected Table createTable(String name, Schema schema, PartitionSpec spec) { TableOperations ops = ((BaseTable) table).operations(); TableMetadata meta = ops.current(); ops.commit(meta, meta.upgradeToFormatVersion(2)); - if (vectorized) { - table - .updateProperties() - .set(TableProperties.PARQUET_VECTORIZATION_ENABLED, "true") - .set( - TableProperties.PARQUET_BATCH_SIZE, - "4") // split 7 records to two batches to cover more code paths - .commit(); - } else { - table.updateProperties().set(TableProperties.PARQUET_VECTORIZATION_ENABLED, "false").commit(); + table.updateProperties().set(TableProperties.DEFAULT_FILE_FORMAT, format).commit(); + if (format.equals("parquet") || format.equals("orc")) { + String vectorizationEnabled = + format.equals("parquet") + ? TableProperties.PARQUET_VECTORIZATION_ENABLED + : TableProperties.ORC_VECTORIZATION_ENABLED; + String batchSize = + format.equals("parquet") + ? TableProperties.PARQUET_BATCH_SIZE + : TableProperties.ORC_BATCH_SIZE; + table.updateProperties().set(vectorizationEnabled, String.valueOf(vectorized)).commit(); + if (vectorized) { + // split 7 records to two batches to cover more code paths + table.updateProperties().set(batchSize, "4").commit(); + } } return table; } @@ -149,6 +164,15 @@ protected void dropTable(String name) { catalog.dropTable(TableIdentifier.of("default", name)); } + protected boolean countDeletes() { + return true; + } + + @Override + protected long deleteCount() { + return Long.parseLong(lastExecutedMetricValue(spark, NumDeletes.DISPLAY_STRING)); + } + @Override public StructLikeSet rowSet(String name, Table table, String... columns) { return rowSet(name, table.schema().select(columns).asStruct(), columns); @@ -304,6 +328,7 @@ public void testPosDeletesAllRowsInBatch() throws IOException { StructLikeSet actual = rowSet(tableName, table, "*"); Assert.assertEquals("Table should contain expected rows", expected, actual); + checkDeleteCount(4L); } @Test @@ -333,6 +358,7 @@ public void testPosDeletesWithDeletedColumn() throws IOException { rowSet(tableName, PROJECTION_SCHEMA.asStruct(), "id", "data", "_deleted"); Assert.assertEquals("Table should contain expected row", expected, actual); + checkDeleteCount(4L); } @Test @@ -362,6 +388,7 @@ public void testEqualityDeleteWithDeletedColumn() throws IOException { rowSet(tableName, PROJECTION_SCHEMA.asStruct(), "id", "data", "_deleted"); Assert.assertEquals("Table should contain expected row", expected, actual); + checkDeleteCount(3L); } @Test @@ -405,6 +432,7 @@ public void testMixedPosAndEqDeletesWithDeletedColumn() throws IOException { rowSet(tableName, PROJECTION_SCHEMA.asStruct(), "id", "data", "_deleted"); Assert.assertEquals("Table should contain expected row", expected, actual); + checkDeleteCount(4L); } @Test @@ -477,6 +505,7 @@ public void testIsDeletedColumnWithoutDeleteFile() { StructLikeSet actual = rowSet(tableName, PROJECTION_SCHEMA.asStruct(), "id", "data", "_deleted"); Assert.assertEquals("Table should contain expected row", expected, actual); + checkDeleteCount(0L); } private static final Schema PROJECTION_SCHEMA = diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java index 9686b63d1858..f07d8c545e35 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java @@ -181,8 +181,12 @@ Pair buildPosDelRowIdMapping(PositionDeleteIndex deletedRowPosit if (!deletedRowPositions.isDeleted(originalRowId + rowStartPosInBatch)) { posDelRowIdMapping[currentRowId] = originalRowId; currentRowId++; - } else if (hasIsDeletedColumn) { - isDeleted[originalRowId] = true; + } else { + if (hasIsDeletedColumn) { + isDeleted[originalRowId] = true; + } + + deletes.incrementDeleteCount(); } originalRowId++; } @@ -203,6 +207,7 @@ int[] initEqDeleteRowIdMapping() { eqDeleteRowIdMapping[i] = i; } } + return eqDeleteRowIdMapping; } @@ -227,8 +232,12 @@ void applyEqDelete(ColumnarBatch columnarBatch) { // skip deleted rows by pointing to the next undeleted row Id rowIdMapping[currentRowId] = rowIdMapping[rowId]; currentRowId++; - } else if (hasIsDeletedColumn) { - isDeleted[rowIdMapping[rowId]] = true; + } else { + if (hasIsDeletedColumn) { + isDeleted[rowIdMapping[rowId]] = true; + } + + deletes.incrementDeleteCount(); } rowId++; diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java index 95bbaaca7cbd..2333cd734bbe 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java @@ -41,6 +41,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.data.DeleteFilter; +import org.apache.iceberg.deletes.DeleteCounter; import org.apache.iceberg.encryption.EncryptedFiles; import org.apache.iceberg.encryption.EncryptedInputFile; import org.apache.iceberg.io.CloseableIterator; @@ -77,6 +78,7 @@ abstract class BaseReader implements Closeable { private final NameMapping nameMapping; private final ScanTaskGroup taskGroup; private final Iterator tasks; + private final DeleteCounter counter; private Map lazyInputFiles; private CloseableIterator currentIterator; @@ -94,6 +96,7 @@ abstract class BaseReader implements Closeable { String nameMappingString = table.properties().get(TableProperties.DEFAULT_NAME_MAPPING); this.nameMapping = nameMappingString != null ? NameMappingParser.fromJson(nameMappingString) : null; + this.counter = new DeleteCounter(); } protected abstract CloseableIterator open(TaskT task); @@ -116,6 +119,10 @@ protected Table table() { return table; } + protected DeleteCounter counter() { + return counter; + } + public boolean next() throws IOException { try { while (true) { @@ -244,8 +251,8 @@ protected static Object convertConstant(Type type, Object value) { protected class SparkDeleteFilter extends DeleteFilter { private final InternalRowWrapper asStructLike; - SparkDeleteFilter(String filePath, List deletes) { - super(filePath, deletes, table.schema(), expectedSchema); + SparkDeleteFilter(String filePath, List deletes, DeleteCounter counter) { + super(filePath, deletes, table.schema(), expectedSchema, counter); this.asStructLike = new InternalRowWrapper(SparkSchemaUtil.convert(requiredSchema())); } @@ -261,7 +268,10 @@ protected InputFile getInputFile(String location) { @Override protected void markRowDeleted(InternalRow row) { - row.setBoolean(columnIsDeletedPosition(), true); + if (!row.getBoolean(columnIsDeletedPosition())) { + row.setBoolean(columnIsDeletedPosition(), true); + counter().increment(); + } } } } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java index 13755f0abc79..5be913ff4682 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java @@ -30,8 +30,12 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.spark.rdd.InputFileBlockHolder; import org.apache.spark.sql.vectorized.ColumnarBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; class BatchDataReader extends BaseBatchReader { + private static final Logger LOG = LoggerFactory.getLogger(BatchDataReader.class); + BatchDataReader( ScanTaskGroup task, Table table, @@ -49,6 +53,7 @@ protected Stream> referencedFiles(FileScanTask task) { @Override protected CloseableIterator open(FileScanTask task) { String filePath = task.file().path().toString(); + LOG.debug("Opening data file {}", filePath); // update the current file for Spark's filename() function InputFileBlockHolder.set(filePath, task.start(), task.length()); @@ -59,7 +64,9 @@ protected CloseableIterator open(FileScanTask task) { Preconditions.checkNotNull(inputFile, "Could not find InputFile associated with FileScanTask"); SparkDeleteFilter deleteFilter = - task.deletes().isEmpty() ? null : new SparkDeleteFilter(filePath, task.deletes()); + task.deletes().isEmpty() + ? null + : new SparkDeleteFilter(filePath, task.deletes(), counter()); return newBatchIterable( inputFile, diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java index 82ab8f360ef0..20f0893bcca3 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java @@ -91,13 +91,13 @@ private CloseableIterable openChangelogScanTask(ChangelogScanTask t CloseableIterable openAddedRowsScanTask(AddedRowsScanTask task) { String filePath = task.file().path().toString(); - SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, task.deletes()); + SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, task.deletes(), counter()); return deletes.filter(rows(task, deletes.requiredSchema())); } private CloseableIterable openDeletedDataFileScanTask(DeletedDataFileScanTask task) { String filePath = task.file().path().toString(); - SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, task.existingDeletes()); + SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, task.existingDeletes(), counter()); return deletes.filter(rows(task, deletes.requiredSchema())); } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java index 5d61747e3dec..9689fd0e030d 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java @@ -37,7 +37,7 @@ public EqualityDeleteRowReader( @Override protected CloseableIterator open(FileScanTask task) { SparkDeleteFilter matches = - new SparkDeleteFilter(task.file().path().toString(), task.deletes()); + new SparkDeleteFilter(task.file().path().toString(), task.deletes(), counter()); // schema or rows returned by readers Schema requiredSchema = matches.requiredSchema(); diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java index 3778049cc71a..dfa7e45761e1 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java @@ -32,8 +32,12 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.spark.rdd.InputFileBlockHolder; import org.apache.spark.sql.catalyst.InternalRow; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; class RowDataReader extends BaseRowReader { + private static final Logger LOG = LoggerFactory.getLogger(RowDataReader.class); + RowDataReader( ScanTaskGroup task, Table table, Schema expectedSchema, boolean caseSensitive) { super(table, task, expectedSchema, caseSensitive); @@ -47,7 +51,8 @@ protected Stream> referencedFiles(FileScanTask task) { @Override protected CloseableIterator open(FileScanTask task) { String filePath = task.file().path().toString(); - SparkDeleteFilter deleteFilter = new SparkDeleteFilter(filePath, task.deletes()); + LOG.debug("Opening data file {}", filePath); + SparkDeleteFilter deleteFilter = new SparkDeleteFilter(filePath, task.deletes(), counter()); // schema or rows returned by readers Schema requiredSchema = deleteFilter.requiredSchema(); diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java index f26daa55b2b3..b17c26d6bbbe 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java @@ -38,10 +38,16 @@ import org.apache.iceberg.spark.SparkReadConf; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.SparkUtil; +import org.apache.iceberg.spark.source.metrics.NumDeletes; +import org.apache.iceberg.spark.source.metrics.NumSplits; +import org.apache.iceberg.spark.source.metrics.TaskNumDeletes; +import org.apache.iceberg.spark.source.metrics.TaskNumSplits; import org.apache.iceberg.util.PropertyUtil; import org.apache.spark.broadcast.Broadcast; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.metric.CustomMetric; +import org.apache.spark.sql.connector.metric.CustomTaskMetric; import org.apache.spark.sql.connector.read.Batch; import org.apache.spark.sql.connector.read.InputPartition; import org.apache.spark.sql.connector.read.PartitionReader; @@ -166,6 +172,11 @@ public String description() { return String.format("%s [filters=%s]", table, filters); } + @Override + public CustomMetric[] supportedCustomMetrics() { + return new CustomMetric[] {new NumSplits(), new NumDeletes()}; + } + static class ReaderFactory implements PartitionReaderFactory { private final int batchSize; @@ -198,15 +209,41 @@ public boolean supportColumnarReads(InputPartition partition) { } private static class RowReader extends RowDataReader implements PartitionReader { + private long numSplits; + RowReader(ReadTask task) { super(task.task, task.table(), task.expectedSchema(), task.isCaseSensitive()); + numSplits = task.task.files().size(); + LOG.debug( + "Reading {} file split(s) for table {} using RowReader", numSplits, task.table().name()); + } + + @Override + public CustomTaskMetric[] currentMetricsValues() { + return new CustomTaskMetric[] { + new TaskNumSplits(numSplits), new TaskNumDeletes(counter().get()) + }; } } private static class BatchReader extends BatchDataReader implements PartitionReader { + private long numSplits; + BatchReader(ReadTask task, int batchSize) { super(task.task, task.table(), task.expectedSchema(), task.isCaseSensitive(), batchSize); + numSplits = task.task.files().size(); + LOG.debug( + "Reading {} file split(s) for table {} using BatchReader", + numSplits, + task.table().name()); + } + + @Override + public CustomTaskMetric[] currentMetricsValues() { + return new CustomTaskMetric[] { + new TaskNumSplits(numSplits), new TaskNumDeletes(counter().get()) + }; } } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/metrics/NumDeletes.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/metrics/NumDeletes.java new file mode 100644 index 000000000000..5654ae3ed514 --- /dev/null +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/metrics/NumDeletes.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import java.text.NumberFormat; +import org.apache.spark.sql.connector.metric.CustomMetric; + +public class NumDeletes implements CustomMetric { + + public static final String DISPLAY_STRING = "number of row deletes applied"; + + @Override + public String name() { + return "numDeletes"; + } + + @Override + public String description() { + return DISPLAY_STRING; + } + + @Override + public String aggregateTaskMetrics(long[] taskMetrics) { + long sum = initialValue; + for (int i = 0; i < taskMetrics.length; i++) { + sum += taskMetrics[i]; + } + + return NumberFormat.getIntegerInstance().format(sum); + } +} diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/metrics/NumSplits.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/metrics/NumSplits.java new file mode 100644 index 000000000000..b4f0fc7a4462 --- /dev/null +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/metrics/NumSplits.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import java.text.NumberFormat; +import org.apache.spark.sql.connector.metric.CustomMetric; + +public class NumSplits implements CustomMetric { + + @Override + public String name() { + return "numSplits"; + } + + @Override + public String description() { + return "number of file splits read"; + } + + @Override + public String aggregateTaskMetrics(long[] taskMetrics) { + long sum = initialValue; + for (int i = 0; i < taskMetrics.length; i++) { + sum += taskMetrics[i]; + } + + return NumberFormat.getIntegerInstance().format(sum); + } +} diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskNumDeletes.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskNumDeletes.java new file mode 100644 index 000000000000..8c734ba9f022 --- /dev/null +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskNumDeletes.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomTaskMetric; + +public class TaskNumDeletes implements CustomTaskMetric { + private final long value; + + public TaskNumDeletes(long value) { + this.value = value; + } + + @Override + public String name() { + return "numDeletes"; + } + + @Override + public long value() { + return value; + } +} diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskNumSplits.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskNumSplits.java new file mode 100644 index 000000000000..d8cbc4db05bb --- /dev/null +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskNumSplits.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomTaskMetric; + +public class TaskNumSplits implements CustomTaskMetric { + private final long value; + + public TaskNumSplits(long value) { + this.value = value; + } + + @Override + public String name() { + return "numSplits"; + } + + @Override + public long value() { + return value; + } +} diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/SparkSQLExecutionHelper.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/SparkSQLExecutionHelper.java new file mode 100644 index 000000000000..3b350bc91e72 --- /dev/null +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/SparkSQLExecutionHelper.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.execution.ui.SQLAppStatusStore; +import org.apache.spark.sql.execution.ui.SQLExecutionUIData; +import org.apache.spark.sql.execution.ui.SQLPlanMetric; +import org.junit.Assert; +import scala.Option; + +public class SparkSQLExecutionHelper { + + private SparkSQLExecutionHelper() {} + + /** + * Finds the value of a specified metric for the last SQL query that was executed. Metric values + * are stored in the `SQLAppStatusStore` as strings. + * + * @param spark SparkSession used to run the SQL query + * @param metricName name of the metric + * @return value of the metric + */ + public static String lastExecutedMetricValue(SparkSession spark, String metricName) { + SQLAppStatusStore statusStore = spark.sharedState().statusStore(); + SQLExecutionUIData lastExecution = statusStore.executionsList().last(); + Option sqlPlanMetric = + lastExecution.metrics().find(metric -> metric.name().equals(metricName)); + Assert.assertTrue( + String.format("Metric '%s' not found in last execution", metricName), + sqlPlanMetric.isDefined()); + long metricId = sqlPlanMetric.get().accumulatorId(); + + // Refresh metricValues, they will remain null until the execution is complete and metrics are + // aggregated + int attempts = 3; + while (lastExecution.metricValues() == null && attempts > 0) { + try { + Thread.sleep(100); + attempts--; + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + lastExecution = statusStore.execution(lastExecution.executionId()).get(); + } + + Assert.assertNotNull("Metric values were not finalized", lastExecution.metricValues()); + String metricValue = lastExecution.metricValues().get(metricId).getOrElse(null); + Assert.assertNotNull(String.format("Metric '%s' was not finalized", metricName), metricValue); + return metricValue; + } +} diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java index 31ec21b3b0fe..4924f07bf198 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java @@ -19,6 +19,7 @@ package org.apache.iceberg.spark.source; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS; +import static org.apache.iceberg.spark.source.SparkSQLExecutionHelper.lastExecutedMetricValue; import static org.apache.iceberg.types.Types.NestedField.required; import java.io.IOException; @@ -54,6 +55,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.SparkStructLike; +import org.apache.iceberg.spark.source.metrics.NumDeletes; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.ArrayUtil; import org.apache.iceberg.util.CharSequenceSet; @@ -78,15 +80,22 @@ public class TestSparkReaderDeletes extends DeleteReadTests { private static TestHiveMetastore metastore = null; protected static SparkSession spark = null; protected static HiveCatalog catalog = null; + private final String format; private final boolean vectorized; - public TestSparkReaderDeletes(boolean vectorized) { + public TestSparkReaderDeletes(String format, boolean vectorized) { + this.format = format; this.vectorized = vectorized; } - @Parameterized.Parameters(name = "vectorized = {0}") - public static Object[] parameters() { - return new Object[] {false, true}; + @Parameterized.Parameters(name = "format = {0}, vectorized = {1}") + public static Object[][] parameters() { + return new Object[][] { + new Object[] {"parquet", false}, + new Object[] {"parquet", true}, + new Object[] {"orc", false}, + new Object[] {"avro", false} + }; } @BeforeClass @@ -98,6 +107,7 @@ public static void startMetastoreAndSpark() { spark = SparkSession.builder() .master("local[2]") + .config("spark.appStateStore.asyncTracking.enable", false) .config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic") .config("spark.hadoop." + METASTOREURIS.varname, hiveConf.get(METASTOREURIS.varname)) .enableHiveSupport() @@ -130,16 +140,21 @@ protected Table createTable(String name, Schema schema, PartitionSpec spec) { TableOperations ops = ((BaseTable) table).operations(); TableMetadata meta = ops.current(); ops.commit(meta, meta.upgradeToFormatVersion(2)); - if (vectorized) { - table - .updateProperties() - .set(TableProperties.PARQUET_VECTORIZATION_ENABLED, "true") - .set( - TableProperties.PARQUET_BATCH_SIZE, - "4") // split 7 records to two batches to cover more code paths - .commit(); - } else { - table.updateProperties().set(TableProperties.PARQUET_VECTORIZATION_ENABLED, "false").commit(); + table.updateProperties().set(TableProperties.DEFAULT_FILE_FORMAT, format).commit(); + if (format.equals("parquet") || format.equals("orc")) { + String vectorizationEnabled = + format.equals("parquet") + ? TableProperties.PARQUET_VECTORIZATION_ENABLED + : TableProperties.ORC_VECTORIZATION_ENABLED; + String batchSize = + format.equals("parquet") + ? TableProperties.PARQUET_BATCH_SIZE + : TableProperties.ORC_BATCH_SIZE; + table.updateProperties().set(vectorizationEnabled, String.valueOf(vectorized)).commit(); + if (vectorized) { + // split 7 records to two batches to cover more code paths + table.updateProperties().set(batchSize, "4").commit(); + } } return table; } @@ -149,6 +164,15 @@ protected void dropTable(String name) { catalog.dropTable(TableIdentifier.of("default", name)); } + protected boolean countDeletes() { + return true; + } + + @Override + protected long deleteCount() { + return Long.parseLong(lastExecutedMetricValue(spark, NumDeletes.DISPLAY_STRING)); + } + @Override public StructLikeSet rowSet(String name, Table table, String... columns) { return rowSet(name, table.schema().select(columns).asStruct(), columns); @@ -304,6 +328,7 @@ public void testPosDeletesAllRowsInBatch() throws IOException { StructLikeSet actual = rowSet(tableName, table, "*"); Assert.assertEquals("Table should contain expected rows", expected, actual); + checkDeleteCount(4L); } @Test @@ -333,6 +358,7 @@ public void testPosDeletesWithDeletedColumn() throws IOException { rowSet(tableName, PROJECTION_SCHEMA.asStruct(), "id", "data", "_deleted"); Assert.assertEquals("Table should contain expected row", expected, actual); + checkDeleteCount(4L); } @Test @@ -362,6 +388,7 @@ public void testEqualityDeleteWithDeletedColumn() throws IOException { rowSet(tableName, PROJECTION_SCHEMA.asStruct(), "id", "data", "_deleted"); Assert.assertEquals("Table should contain expected row", expected, actual); + checkDeleteCount(3L); } @Test @@ -405,6 +432,7 @@ public void testMixedPosAndEqDeletesWithDeletedColumn() throws IOException { rowSet(tableName, PROJECTION_SCHEMA.asStruct(), "id", "data", "_deleted"); Assert.assertEquals("Table should contain expected row", expected, actual); + checkDeleteCount(4L); } @Test @@ -477,6 +505,7 @@ public void testIsDeletedColumnWithoutDeleteFile() { StructLikeSet actual = rowSet(tableName, PROJECTION_SCHEMA.asStruct(), "id", "data", "_deleted"); Assert.assertEquals("Table should contain expected row", expected, actual); + checkDeleteCount(0L); } private static final Schema PROJECTION_SCHEMA =