From eb0fe4e17c2e83e873059babf90b84e504dd9081 Mon Sep 17 00:00:00 2001 From: Wing Yew Poon Date: Thu, 24 Mar 2022 11:06:28 -0700 Subject: [PATCH 1/8] Add custom metrics for number of file splits read and number of deletes applied (by a SparkScan). --- .../apache/iceberg/deletes/DeleteCounter.java | 35 ++++++++++ .../org/apache/iceberg/deletes/Deletes.java | 44 ++++++++++-- .../org/apache/iceberg/data/DeleteFilter.java | 29 +++++++- .../apache/iceberg/data/DeleteReadTests.java | 42 ++++++++++++ .../data/vectorized/ColumnarBatchReader.java | 9 +++ .../iceberg/spark/source/BaseReader.java | 11 ++- .../iceberg/spark/source/BatchDataReader.java | 9 ++- .../spark/source/EqualityDeleteRowReader.java | 2 +- .../iceberg/spark/source/RowDataReader.java | 7 +- .../iceberg/spark/source/SparkScan.java | 37 ++++++++++ .../spark/source/metrics/NumDeletes.java | 46 +++++++++++++ .../spark/source/metrics/NumSplits.java | 44 ++++++++++++ .../spark/source/metrics/TaskNumDeletes.java | 39 +++++++++++ .../spark/source/metrics/TaskNumSplits.java | 39 +++++++++++ .../spark/source/SparkSQLExecutionHelper.java | 68 +++++++++++++++++++ .../spark/source/TestSparkReaderDeletes.java | 32 +++++++-- 16 files changed, 474 insertions(+), 19 deletions(-) create mode 100644 core/src/main/java/org/apache/iceberg/deletes/DeleteCounter.java create mode 100644 spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/metrics/NumDeletes.java create mode 100644 spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/metrics/NumSplits.java create mode 100644 spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskNumDeletes.java create mode 100644 spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskNumSplits.java create mode 100644 spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/SparkSQLExecutionHelper.java diff --git a/core/src/main/java/org/apache/iceberg/deletes/DeleteCounter.java b/core/src/main/java/org/apache/iceberg/deletes/DeleteCounter.java new file mode 100644 index 000000000000..f6daa9b4d6e3 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/deletes/DeleteCounter.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.deletes; + +/** A counter to be used to count deletes as they are applied. */ +public class DeleteCounter { + + private long count = 0L; + + /** Increment the counter by one. */ + public void increment() { + count++; + } + + /** Return the current value of the counter. */ + public long get() { + return count; + } +} diff --git a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java index bd4e03916a01..6485ee05744c 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java +++ b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java @@ -62,6 +62,7 @@ public static CloseableIterable filter( return equalityFilter.filter(rows); } + /** Returns the same rows that are input, while marking the deleted ones. */ public static CloseableIterable markDeleted( CloseableIterable rows, Predicate isDeleted, Consumer deleteMarker) { return CloseableIterable.transform( @@ -74,9 +75,24 @@ public static CloseableIterable markDeleted( }); } + /** + * Returns the remaining rows (the ones that are not deleted), while counting the deleted ones. + */ public static CloseableIterable filterDeleted( - CloseableIterable rows, Predicate isDeleted) { - return CloseableIterable.filter(rows, isDeleted.negate()); + CloseableIterable rows, Predicate isDeleted, DeleteCounter counter) { + Filter remainingRowsFilter = + new Filter() { + @Override + protected boolean shouldKeep(T item) { + boolean deleted = isDeleted.test(item); + if (deleted) { + counter.increment(); + } + return !deleted; + } + }; + + return remainingRowsFilter.filter(rows); } public static StructLikeSet toEqualitySet( @@ -116,7 +132,15 @@ public static CloseableIterable streamingFilter( CloseableIterable rows, Function rowToPosition, CloseableIterable posDeletes) { - return new PositionStreamDeleteFilter<>(rows, rowToPosition, posDeletes); + return streamingFilter(rows, rowToPosition, posDeletes, new DeleteCounter()); + } + + public static CloseableIterable streamingFilter( + CloseableIterable rows, + Function rowToPosition, + CloseableIterable posDeletes, + DeleteCounter counter) { + return new PositionStreamDeleteFilter<>(rows, rowToPosition, posDeletes, counter); } public static CloseableIterable streamingMarker( @@ -215,11 +239,15 @@ boolean isDeleted(T row) { } private static class PositionStreamDeleteFilter extends PositionStreamDeleteIterable { - private PositionStreamDeleteFilter( + private final DeleteCounter counter; + + PositionStreamDeleteFilter( CloseableIterable rows, Function rowToPosition, - CloseableIterable deletePositions) { + CloseableIterable deletePositions, + DeleteCounter counter) { super(rows, rowToPosition, deletePositions); + this.counter = counter; } @Override @@ -227,7 +255,11 @@ protected CloseableIterator applyDelete(CloseableIterator items) { return new FilterIterator(items) { @Override protected boolean shouldKeep(T item) { - return !isDeleted(item); + boolean deleted = isDeleted(item); + if (deleted) { + counter.increment(); + } + return !deleted; } }; } diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java index f56748aeca04..a7979fd2ed3e 100644 --- a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java +++ b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java @@ -33,6 +33,7 @@ import org.apache.iceberg.data.avro.DataReader; import org.apache.iceberg.data.orc.GenericOrcReader; import org.apache.iceberg.data.parquet.GenericParquetReaders; +import org.apache.iceberg.deletes.DeleteCounter; import org.apache.iceberg.deletes.Deletes; import org.apache.iceberg.deletes.PositionDeleteIndex; import org.apache.iceberg.expressions.Expressions; @@ -52,8 +53,11 @@ import org.apache.iceberg.types.Types; import org.apache.iceberg.util.StructLikeSet; import org.apache.iceberg.util.StructProjection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public abstract class DeleteFilter { + private static final Logger LOG = LoggerFactory.getLogger(DeleteFilter.class); private static final long DEFAULT_SET_FILTER_THRESHOLD = 100_000L; private static final Schema POS_DELETE_SCHEMA = new Schema(MetadataColumns.DELETE_FILE_PATH, MetadataColumns.DELETE_FILE_POS); @@ -66,24 +70,32 @@ public abstract class DeleteFilter { private final Accessor posAccessor; private final boolean hasIsDeletedColumn; private final int isDeletedColumnPosition; + private final DeleteCounter counter; private PositionDeleteIndex deleteRowPositions = null; private List> isInDeleteSets = null; private Predicate eqDeleteRows = null; protected DeleteFilter( - String filePath, List deletes, Schema tableSchema, Schema requestedSchema) { + String filePath, + List deletes, + Schema tableSchema, + Schema requestedSchema, + DeleteCounter counter) { this.setFilterThreshold = DEFAULT_SET_FILTER_THRESHOLD; this.filePath = filePath; + this.counter = counter; ImmutableList.Builder posDeleteBuilder = ImmutableList.builder(); ImmutableList.Builder eqDeleteBuilder = ImmutableList.builder(); for (DeleteFile delete : deletes) { switch (delete.content()) { case POSITION_DELETES: + LOG.debug("Adding position delete file {} to filter", delete.path()); posDeleteBuilder.add(delete); break; case EQUALITY_DELETES: + LOG.debug("Adding equality delete file {} to filter", delete.path()); eqDeleteBuilder.add(delete); break; default: @@ -101,6 +113,11 @@ protected DeleteFilter( this.isDeletedColumnPosition = requiredSchema.columns().indexOf(MetadataColumns.IS_DELETED); } + protected DeleteFilter( + String filePath, List deletes, Schema tableSchema, Schema requestedSchema) { + this(filePath, deletes, tableSchema, requestedSchema, new DeleteCounter()); + } + protected int columnIsDeletedPosition() { return isDeletedColumnPosition; } @@ -117,6 +134,10 @@ public boolean hasEqDeletes() { return !eqDeletes.isEmpty(); } + public void incrementDeleteCount() { + counter.increment(); + } + Accessor posAccessor() { return posAccessor; } @@ -234,14 +255,15 @@ private CloseableIterable applyPosDeletes(CloseableIterable records) { return hasIsDeletedColumn ? Deletes.streamingMarker( records, this::pos, Deletes.deletePositions(filePath, deletes), this::markRowDeleted) - : Deletes.streamingFilter(records, this::pos, Deletes.deletePositions(filePath, deletes)); + : Deletes.streamingFilter( + records, this::pos, Deletes.deletePositions(filePath, deletes), counter); } private CloseableIterable createDeleteIterable( CloseableIterable records, Predicate isDeleted) { return hasIsDeletedColumn ? Deletes.markDeleted(records, isDeleted, this::markRowDeleted) - : Deletes.filterDeleted(records, isDeleted); + : Deletes.filterDeleted(records, isDeleted, counter); } private CloseableIterable openPosDeletes(DeleteFile file) { @@ -249,6 +271,7 @@ private CloseableIterable openPosDeletes(DeleteFile file) { } private CloseableIterable openDeletes(DeleteFile deleteFile, Schema deleteSchema) { + LOG.trace("Opening delete file {}", deleteFile.path()); InputFile input = getInputFile(deleteFile.path().toString()); switch (deleteFile.format()) { case AVRO: diff --git a/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java b/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java index 659ac895ff8e..cdd0268e3977 100644 --- a/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java +++ b/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java @@ -171,6 +171,26 @@ protected boolean expectPruned() { return true; } + protected boolean countDeletes() { + return false; + } + + /** + * This will only be called after calling rowSet(String, Table, String...), and only if + * countDeletes() is true. + */ + protected long deleteCount() { + return 0L; + } + + protected void checkDeleteCount(long expectedDeletes) { + if (countDeletes()) { + long actualDeletes = deleteCount(); + Assert.assertEquals( + "Table should contain expected number of deletes", expectedDeletes, actualDeletes); + } + } + @Test public void testEqualityDeletes() throws IOException { Schema deleteRowSchema = table.schema().select("data"); @@ -192,6 +212,8 @@ public void testEqualityDeletes() throws IOException { StructLikeSet actual = rowSet(tableName, table, "*"); Assert.assertEquals("Table should contain expected rows", expected, actual); + long expectedDeletes = 3L; + checkDeleteCount(expectedDeletes); } @Test @@ -240,6 +262,8 @@ public void testEqualityDateDeletes() throws IOException { StructLikeSet actual = rowSet(dateTableName, dateTable, "*"); Assert.assertEquals("Table should contain expected rows", expected, actual); + long expectedDeletes = 3L; + checkDeleteCount(expectedDeletes); } @Test @@ -270,6 +294,9 @@ public void testEqualityDeletesWithRequiredEqColumn() throws IOException { Assert.assertEquals( "Table should contain expected rows", expected, selectColumns(actual, "id")); } + + long expectedDeletes = 3L; + checkDeleteCount(expectedDeletes); } @Test @@ -282,6 +309,7 @@ public void testEqualityDeletesSpanningMultipleDataFiles() throws IOException { FileHelpers.writeDataFile(table, Files.localOutput(temp.newFile()), Row.of(0), records); table.newAppend().appendFile(dataFile).commit(); + // At this point, the table has 7 + 8 = 15 rows, of which all but one are in duplicate. Schema deleteRowSchema = table.schema().select("data"); Record dataDelete = GenericRecord.create(deleteRowSchema); @@ -297,11 +325,15 @@ public void testEqualityDeletesSpanningMultipleDataFiles() throws IOException { table, Files.localOutput(temp.newFile()), Row.of(0), dataDeletes, deleteRowSchema); table.newRowDelta().addDeletes(eqDeletes).commit(); + // At ths point, the table has (7 - 3) + (8 - 4) = 8 rows. 7 rows in all are deleted. StructLikeSet expected = rowSetWithoutIds(table, records, 29, 89, 122, 144); StructLikeSet actual = rowSet(tableName, table, "*"); Assert.assertEquals("Table should contain expected rows", expected, actual); + long expectedDeletes = + 7L; // 3 deletes in the first data file and 4 deletes in the second data file + checkDeleteCount(expectedDeletes); } @Test @@ -326,6 +358,8 @@ public void testPositionDeletes() throws IOException { StructLikeSet actual = rowSet(tableName, table, "*"); Assert.assertEquals("Table should contain expected rows", expected, actual); + long expectedDeletes = 3L; + checkDeleteCount(expectedDeletes); } @Test @@ -363,6 +397,8 @@ public void testMultiplePosDeleteFiles() throws IOException { StructLikeSet actual = rowSet(tableName, table, "*"); Assert.assertEquals("Table should contain expected rows", expected, actual); + long expectedDeletes = 3L; + checkDeleteCount(expectedDeletes); } @Test @@ -400,6 +436,8 @@ public void testMixedPositionAndEqualityDeletes() throws IOException { StructLikeSet actual = rowSet(tableName, table, "*"); Assert.assertEquals("Table should contain expected rows", expected, actual); + long expectedDeletes = 4L; + checkDeleteCount(expectedDeletes); } @Test @@ -435,6 +473,8 @@ public void testMultipleEqualityDeleteSchemas() throws IOException { StructLikeSet actual = rowSet(tableName, table, "*"); Assert.assertEquals("Table should contain expected rows", expected, actual); + long expectedDeletes = 4L; + checkDeleteCount(expectedDeletes); } @Test @@ -471,6 +511,8 @@ public void testEqualityDeleteByNull() throws IOException { StructLikeSet actual = rowSet(tableName, table, "*"); Assert.assertEquals("Table should contain expected rows", expected, actual); + long expectedDeletes = 1L; + checkDeleteCount(expectedDeletes); } private StructLikeSet selectColumns(StructLikeSet rows, String... columns) { diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java index 9686b63d1858..b67da43773cd 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java @@ -35,6 +35,8 @@ import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.vectorized.ColumnVector; import org.apache.spark.sql.vectorized.ColumnarBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * {@link VectorizedReader} that returns Spark's {@link ColumnarBatch} to support Spark's vectorized @@ -42,6 +44,7 @@ * populated via delegated read calls to {@linkplain VectorizedArrowReader VectorReader(s)}. */ public class ColumnarBatchReader extends BaseBatchReader { + private static final Logger LOG = LoggerFactory.getLogger(ColumnarBatchReader.class); private final boolean hasIsDeletedColumn; private DeleteFilter deletes = null; private long rowStartPosInBatch = 0; @@ -170,6 +173,7 @@ Pair posDelRowIdMapping() { * @return the mapping array and the new num of rows in a batch, null if no row is deleted */ Pair buildPosDelRowIdMapping(PositionDeleteIndex deletedRowPositions) { + LOG.debug("Building row id mapping from positional deletes"); if (deletedRowPositions == null) { return null; } @@ -183,6 +187,8 @@ Pair buildPosDelRowIdMapping(PositionDeleteIndex deletedRowPosit currentRowId++; } else if (hasIsDeletedColumn) { isDeleted[originalRowId] = true; + } else { + deletes.incrementDeleteCount(); } originalRowId++; } @@ -217,6 +223,7 @@ int[] initEqDeleteRowIdMapping() { * @param columnarBatch the {@link ColumnarBatch} to apply the equality delete */ void applyEqDelete(ColumnarBatch columnarBatch) { + LOG.debug("Applying equality deletes to row id mapping"); Iterator it = columnarBatch.rowIterator(); int rowId = 0; int currentRowId = 0; @@ -229,6 +236,8 @@ void applyEqDelete(ColumnarBatch columnarBatch) { currentRowId++; } else if (hasIsDeletedColumn) { isDeleted[rowIdMapping[rowId]] = true; + } else { + deletes.incrementDeleteCount(); } rowId++; diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java index 95bbaaca7cbd..6e2219a8e329 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java @@ -41,6 +41,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.data.DeleteFilter; +import org.apache.iceberg.deletes.DeleteCounter; import org.apache.iceberg.encryption.EncryptedFiles; import org.apache.iceberg.encryption.EncryptedInputFile; import org.apache.iceberg.io.CloseableIterator; @@ -77,6 +78,7 @@ abstract class BaseReader implements Closeable { private final NameMapping nameMapping; private final ScanTaskGroup taskGroup; private final Iterator tasks; + private final DeleteCounter counter; private Map lazyInputFiles; private CloseableIterator currentIterator; @@ -94,6 +96,7 @@ abstract class BaseReader implements Closeable { String nameMappingString = table.properties().get(TableProperties.DEFAULT_NAME_MAPPING); this.nameMapping = nameMappingString != null ? NameMappingParser.fromJson(nameMappingString) : null; + this.counter = new DeleteCounter(); } protected abstract CloseableIterator open(TaskT task); @@ -116,6 +119,10 @@ protected Table table() { return table; } + protected DeleteCounter counter() { + return counter; + } + public boolean next() throws IOException { try { while (true) { @@ -244,8 +251,8 @@ protected static Object convertConstant(Type type, Object value) { protected class SparkDeleteFilter extends DeleteFilter { private final InternalRowWrapper asStructLike; - SparkDeleteFilter(String filePath, List deletes) { - super(filePath, deletes, table.schema(), expectedSchema); + SparkDeleteFilter(String filePath, List deletes, DeleteCounter counter) { + super(filePath, deletes, table.schema(), expectedSchema, counter); this.asStructLike = new InternalRowWrapper(SparkSchemaUtil.convert(requiredSchema())); } diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java index 13755f0abc79..5be913ff4682 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java @@ -30,8 +30,12 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.spark.rdd.InputFileBlockHolder; import org.apache.spark.sql.vectorized.ColumnarBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; class BatchDataReader extends BaseBatchReader { + private static final Logger LOG = LoggerFactory.getLogger(BatchDataReader.class); + BatchDataReader( ScanTaskGroup task, Table table, @@ -49,6 +53,7 @@ protected Stream> referencedFiles(FileScanTask task) { @Override protected CloseableIterator open(FileScanTask task) { String filePath = task.file().path().toString(); + LOG.debug("Opening data file {}", filePath); // update the current file for Spark's filename() function InputFileBlockHolder.set(filePath, task.start(), task.length()); @@ -59,7 +64,9 @@ protected CloseableIterator open(FileScanTask task) { Preconditions.checkNotNull(inputFile, "Could not find InputFile associated with FileScanTask"); SparkDeleteFilter deleteFilter = - task.deletes().isEmpty() ? null : new SparkDeleteFilter(filePath, task.deletes()); + task.deletes().isEmpty() + ? null + : new SparkDeleteFilter(filePath, task.deletes(), counter()); return newBatchIterable( inputFile, diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java index 5d61747e3dec..9689fd0e030d 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java @@ -37,7 +37,7 @@ public EqualityDeleteRowReader( @Override protected CloseableIterator open(FileScanTask task) { SparkDeleteFilter matches = - new SparkDeleteFilter(task.file().path().toString(), task.deletes()); + new SparkDeleteFilter(task.file().path().toString(), task.deletes(), counter()); // schema or rows returned by readers Schema requiredSchema = matches.requiredSchema(); diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java index 3778049cc71a..dfa7e45761e1 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java @@ -32,8 +32,12 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.spark.rdd.InputFileBlockHolder; import org.apache.spark.sql.catalyst.InternalRow; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; class RowDataReader extends BaseRowReader { + private static final Logger LOG = LoggerFactory.getLogger(RowDataReader.class); + RowDataReader( ScanTaskGroup task, Table table, Schema expectedSchema, boolean caseSensitive) { super(table, task, expectedSchema, caseSensitive); @@ -47,7 +51,8 @@ protected Stream> referencedFiles(FileScanTask task) { @Override protected CloseableIterator open(FileScanTask task) { String filePath = task.file().path().toString(); - SparkDeleteFilter deleteFilter = new SparkDeleteFilter(filePath, task.deletes()); + LOG.debug("Opening data file {}", filePath); + SparkDeleteFilter deleteFilter = new SparkDeleteFilter(filePath, task.deletes(), counter()); // schema or rows returned by readers Schema requiredSchema = deleteFilter.requiredSchema(); diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java index f26daa55b2b3..b17c26d6bbbe 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java @@ -38,10 +38,16 @@ import org.apache.iceberg.spark.SparkReadConf; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.SparkUtil; +import org.apache.iceberg.spark.source.metrics.NumDeletes; +import org.apache.iceberg.spark.source.metrics.NumSplits; +import org.apache.iceberg.spark.source.metrics.TaskNumDeletes; +import org.apache.iceberg.spark.source.metrics.TaskNumSplits; import org.apache.iceberg.util.PropertyUtil; import org.apache.spark.broadcast.Broadcast; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.metric.CustomMetric; +import org.apache.spark.sql.connector.metric.CustomTaskMetric; import org.apache.spark.sql.connector.read.Batch; import org.apache.spark.sql.connector.read.InputPartition; import org.apache.spark.sql.connector.read.PartitionReader; @@ -166,6 +172,11 @@ public String description() { return String.format("%s [filters=%s]", table, filters); } + @Override + public CustomMetric[] supportedCustomMetrics() { + return new CustomMetric[] {new NumSplits(), new NumDeletes()}; + } + static class ReaderFactory implements PartitionReaderFactory { private final int batchSize; @@ -198,15 +209,41 @@ public boolean supportColumnarReads(InputPartition partition) { } private static class RowReader extends RowDataReader implements PartitionReader { + private long numSplits; + RowReader(ReadTask task) { super(task.task, task.table(), task.expectedSchema(), task.isCaseSensitive()); + numSplits = task.task.files().size(); + LOG.debug( + "Reading {} file split(s) for table {} using RowReader", numSplits, task.table().name()); + } + + @Override + public CustomTaskMetric[] currentMetricsValues() { + return new CustomTaskMetric[] { + new TaskNumSplits(numSplits), new TaskNumDeletes(counter().get()) + }; } } private static class BatchReader extends BatchDataReader implements PartitionReader { + private long numSplits; + BatchReader(ReadTask task, int batchSize) { super(task.task, task.table(), task.expectedSchema(), task.isCaseSensitive(), batchSize); + numSplits = task.task.files().size(); + LOG.debug( + "Reading {} file split(s) for table {} using BatchReader", + numSplits, + task.table().name()); + } + + @Override + public CustomTaskMetric[] currentMetricsValues() { + return new CustomTaskMetric[] { + new TaskNumSplits(numSplits), new TaskNumDeletes(counter().get()) + }; } } diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/metrics/NumDeletes.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/metrics/NumDeletes.java new file mode 100644 index 000000000000..dd65baf3c95c --- /dev/null +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/metrics/NumDeletes.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import java.text.NumberFormat; +import org.apache.spark.sql.connector.metric.CustomMetric; + +public class NumDeletes implements CustomMetric { + + public static final String DISPLAY_STRING = "number of row deletes applied"; + + @Override + public String name() { + return "numDeletes"; + } + + @Override + public String description() { + return DISPLAY_STRING; + } + + @Override + public String aggregateTaskMetrics(long[] taskMetrics) { + long sum = initialValue; + for (int i = 0; i < taskMetrics.length; i++) { + sum += taskMetrics[i]; + } + return NumberFormat.getIntegerInstance().format(sum); + } +} diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/metrics/NumSplits.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/metrics/NumSplits.java new file mode 100644 index 000000000000..96236382b510 --- /dev/null +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/metrics/NumSplits.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import java.text.NumberFormat; +import org.apache.spark.sql.connector.metric.CustomMetric; + +public class NumSplits implements CustomMetric { + + @Override + public String name() { + return "numSplits"; + } + + @Override + public String description() { + return "number of file splits read"; + } + + @Override + public String aggregateTaskMetrics(long[] taskMetrics) { + long sum = initialValue; + for (int i = 0; i < taskMetrics.length; i++) { + sum += taskMetrics[i]; + } + return NumberFormat.getIntegerInstance().format(sum); + } +} diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskNumDeletes.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskNumDeletes.java new file mode 100644 index 000000000000..8c734ba9f022 --- /dev/null +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskNumDeletes.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomTaskMetric; + +public class TaskNumDeletes implements CustomTaskMetric { + private final long value; + + public TaskNumDeletes(long value) { + this.value = value; + } + + @Override + public String name() { + return "numDeletes"; + } + + @Override + public long value() { + return value; + } +} diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskNumSplits.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskNumSplits.java new file mode 100644 index 000000000000..d8cbc4db05bb --- /dev/null +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskNumSplits.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomTaskMetric; + +public class TaskNumSplits implements CustomTaskMetric { + private final long value; + + public TaskNumSplits(long value) { + this.value = value; + } + + @Override + public String name() { + return "numSplits"; + } + + @Override + public long value() { + return value; + } +} diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/SparkSQLExecutionHelper.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/SparkSQLExecutionHelper.java new file mode 100644 index 000000000000..5fed8d305dec --- /dev/null +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/SparkSQLExecutionHelper.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.execution.ui.SQLAppStatusStore; +import org.apache.spark.sql.execution.ui.SQLExecutionUIData; +import org.apache.spark.sql.execution.ui.SQLPlanMetric; +import org.junit.Assert; +import scala.Option; + +public class SparkSQLExecutionHelper { + + private SparkSQLExecutionHelper() {} + + /** + * Finds the value of a specified metric for the last SQL query that was executed. Metric values + * are stored in the `SQLAppStatusStore` as strings. + * + * @param spark SparkSession used to run the SQL query + * @param metricName name of the metric + * @return value of the metric + */ + public static String lastExecutedMetricValue(SparkSession spark, String metricName) { + SQLAppStatusStore statusStore = spark.sharedState().statusStore(); + SQLExecutionUIData lastExecution = statusStore.executionsList().last(); + Option sqlPlanMetric = + lastExecution.metrics().find(metric -> metric.name().equals(metricName)); + Assert.assertTrue( + String.format("Metric '%s' not found in last execution", metricName), + sqlPlanMetric.isDefined()); + long metricId = sqlPlanMetric.get().accumulatorId(); + + // Refresh metricValues, they will remain null until the execution is complete and metrics are + // aggregated + int attempts = 3; + while (lastExecution.metricValues() == null && attempts > 0) { + try { + Thread.sleep(100); + attempts -= 1; + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + lastExecution = statusStore.execution(lastExecution.executionId()).get(); + } + + Assert.assertNotNull("Metric values were not finalized", lastExecution.metricValues()); + String metricValue = lastExecution.metricValues().get(metricId).getOrElse(null); + Assert.assertNotNull(String.format("Metric '%s' was not finalized", metricName), metricValue); + return metricValue; + } +} diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java index 31ec21b3b0fe..1b88d8b134e3 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java @@ -19,6 +19,7 @@ package org.apache.iceberg.spark.source; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS; +import static org.apache.iceberg.spark.source.SparkSQLExecutionHelper.lastExecutedMetricValue; import static org.apache.iceberg.types.Types.NestedField.required; import java.io.IOException; @@ -54,6 +55,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.SparkStructLike; +import org.apache.iceberg.spark.source.metrics.NumDeletes; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.ArrayUtil; import org.apache.iceberg.util.CharSequenceSet; @@ -78,15 +80,22 @@ public class TestSparkReaderDeletes extends DeleteReadTests { private static TestHiveMetastore metastore = null; protected static SparkSession spark = null; protected static HiveCatalog catalog = null; + private final String format; private final boolean vectorized; - public TestSparkReaderDeletes(boolean vectorized) { + public TestSparkReaderDeletes(String format, boolean vectorized) { + this.format = format; this.vectorized = vectorized; } - @Parameterized.Parameters(name = "vectorized = {0}") - public static Object[] parameters() { - return new Object[] {false, true}; + @Parameterized.Parameters(name = "format = {0}, vectorized = {1}") + public static Object[][] parameters() { + return new Object[][] { + new Object[] {"parquet", false}, + new Object[] {"parquet", true}, + new Object[] {"orc", false}, + new Object[] {"avro", false} + }; } @BeforeClass @@ -98,6 +107,7 @@ public static void startMetastoreAndSpark() { spark = SparkSession.builder() .master("local[2]") + .config("spark.appStateStore.asyncTracking.enable", false) .config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic") .config("spark.hadoop." + METASTOREURIS.varname, hiveConf.get(METASTOREURIS.varname)) .enableHiveSupport() @@ -130,7 +140,8 @@ protected Table createTable(String name, Schema schema, PartitionSpec spec) { TableOperations ops = ((BaseTable) table).operations(); TableMetadata meta = ops.current(); ops.commit(meta, meta.upgradeToFormatVersion(2)); - if (vectorized) { + table.updateProperties().set(TableProperties.DEFAULT_FILE_FORMAT, format).commit(); + if (format.equals("parquet") && vectorized) { table .updateProperties() .set(TableProperties.PARQUET_VECTORIZATION_ENABLED, "true") @@ -149,6 +160,15 @@ protected void dropTable(String name) { catalog.dropTable(TableIdentifier.of("default", name)); } + protected boolean countDeletes() { + return true; + } + + @Override + protected long deleteCount() { + return Long.parseLong(lastExecutedMetricValue(spark, NumDeletes.DISPLAY_STRING)); + } + @Override public StructLikeSet rowSet(String name, Table table, String... columns) { return rowSet(name, table.schema().select(columns).asStruct(), columns); @@ -304,6 +324,8 @@ public void testPosDeletesAllRowsInBatch() throws IOException { StructLikeSet actual = rowSet(tableName, table, "*"); Assert.assertEquals("Table should contain expected rows", expected, actual); + long expectedDeletes = 4L; + checkDeleteCount(expectedDeletes); } @Test From b4bb242d7d937679a65e5836c74d30efcbcdae5a Mon Sep 17 00:00:00 2001 From: Wing Yew Poon Date: Thu, 4 Aug 2022 16:15:51 -0700 Subject: [PATCH 2/8] Port changes to Spark 3.3. --- .../data/vectorized/ColumnarBatchReader.java | 9 +++ .../iceberg/spark/source/BaseReader.java | 11 ++- .../iceberg/spark/source/BatchDataReader.java | 9 ++- .../spark/source/EqualityDeleteRowReader.java | 2 +- .../iceberg/spark/source/RowDataReader.java | 7 +- .../iceberg/spark/source/SparkScan.java | 37 ++++++++++ .../spark/source/metrics/NumDeletes.java | 46 +++++++++++++ .../spark/source/metrics/NumSplits.java | 44 ++++++++++++ .../spark/source/metrics/TaskNumDeletes.java | 39 +++++++++++ .../spark/source/metrics/TaskNumSplits.java | 39 +++++++++++ .../spark/source/SparkSQLExecutionHelper.java | 68 +++++++++++++++++++ .../spark/source/TestSparkReaderDeletes.java | 32 +++++++-- 12 files changed, 333 insertions(+), 10 deletions(-) create mode 100644 spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/metrics/NumDeletes.java create mode 100644 spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/metrics/NumSplits.java create mode 100644 spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskNumDeletes.java create mode 100644 spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskNumSplits.java create mode 100644 spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/SparkSQLExecutionHelper.java diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java index 9686b63d1858..b67da43773cd 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java @@ -35,6 +35,8 @@ import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.vectorized.ColumnVector; import org.apache.spark.sql.vectorized.ColumnarBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * {@link VectorizedReader} that returns Spark's {@link ColumnarBatch} to support Spark's vectorized @@ -42,6 +44,7 @@ * populated via delegated read calls to {@linkplain VectorizedArrowReader VectorReader(s)}. */ public class ColumnarBatchReader extends BaseBatchReader { + private static final Logger LOG = LoggerFactory.getLogger(ColumnarBatchReader.class); private final boolean hasIsDeletedColumn; private DeleteFilter deletes = null; private long rowStartPosInBatch = 0; @@ -170,6 +173,7 @@ Pair posDelRowIdMapping() { * @return the mapping array and the new num of rows in a batch, null if no row is deleted */ Pair buildPosDelRowIdMapping(PositionDeleteIndex deletedRowPositions) { + LOG.debug("Building row id mapping from positional deletes"); if (deletedRowPositions == null) { return null; } @@ -183,6 +187,8 @@ Pair buildPosDelRowIdMapping(PositionDeleteIndex deletedRowPosit currentRowId++; } else if (hasIsDeletedColumn) { isDeleted[originalRowId] = true; + } else { + deletes.incrementDeleteCount(); } originalRowId++; } @@ -217,6 +223,7 @@ int[] initEqDeleteRowIdMapping() { * @param columnarBatch the {@link ColumnarBatch} to apply the equality delete */ void applyEqDelete(ColumnarBatch columnarBatch) { + LOG.debug("Applying equality deletes to row id mapping"); Iterator it = columnarBatch.rowIterator(); int rowId = 0; int currentRowId = 0; @@ -229,6 +236,8 @@ void applyEqDelete(ColumnarBatch columnarBatch) { currentRowId++; } else if (hasIsDeletedColumn) { isDeleted[rowIdMapping[rowId]] = true; + } else { + deletes.incrementDeleteCount(); } rowId++; diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java index 95bbaaca7cbd..6e2219a8e329 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java @@ -41,6 +41,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.data.DeleteFilter; +import org.apache.iceberg.deletes.DeleteCounter; import org.apache.iceberg.encryption.EncryptedFiles; import org.apache.iceberg.encryption.EncryptedInputFile; import org.apache.iceberg.io.CloseableIterator; @@ -77,6 +78,7 @@ abstract class BaseReader implements Closeable { private final NameMapping nameMapping; private final ScanTaskGroup taskGroup; private final Iterator tasks; + private final DeleteCounter counter; private Map lazyInputFiles; private CloseableIterator currentIterator; @@ -94,6 +96,7 @@ abstract class BaseReader implements Closeable { String nameMappingString = table.properties().get(TableProperties.DEFAULT_NAME_MAPPING); this.nameMapping = nameMappingString != null ? NameMappingParser.fromJson(nameMappingString) : null; + this.counter = new DeleteCounter(); } protected abstract CloseableIterator open(TaskT task); @@ -116,6 +119,10 @@ protected Table table() { return table; } + protected DeleteCounter counter() { + return counter; + } + public boolean next() throws IOException { try { while (true) { @@ -244,8 +251,8 @@ protected static Object convertConstant(Type type, Object value) { protected class SparkDeleteFilter extends DeleteFilter { private final InternalRowWrapper asStructLike; - SparkDeleteFilter(String filePath, List deletes) { - super(filePath, deletes, table.schema(), expectedSchema); + SparkDeleteFilter(String filePath, List deletes, DeleteCounter counter) { + super(filePath, deletes, table.schema(), expectedSchema, counter); this.asStructLike = new InternalRowWrapper(SparkSchemaUtil.convert(requiredSchema())); } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java index 13755f0abc79..5be913ff4682 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java @@ -30,8 +30,12 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.spark.rdd.InputFileBlockHolder; import org.apache.spark.sql.vectorized.ColumnarBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; class BatchDataReader extends BaseBatchReader { + private static final Logger LOG = LoggerFactory.getLogger(BatchDataReader.class); + BatchDataReader( ScanTaskGroup task, Table table, @@ -49,6 +53,7 @@ protected Stream> referencedFiles(FileScanTask task) { @Override protected CloseableIterator open(FileScanTask task) { String filePath = task.file().path().toString(); + LOG.debug("Opening data file {}", filePath); // update the current file for Spark's filename() function InputFileBlockHolder.set(filePath, task.start(), task.length()); @@ -59,7 +64,9 @@ protected CloseableIterator open(FileScanTask task) { Preconditions.checkNotNull(inputFile, "Could not find InputFile associated with FileScanTask"); SparkDeleteFilter deleteFilter = - task.deletes().isEmpty() ? null : new SparkDeleteFilter(filePath, task.deletes()); + task.deletes().isEmpty() + ? null + : new SparkDeleteFilter(filePath, task.deletes(), counter()); return newBatchIterable( inputFile, diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java index 5d61747e3dec..9689fd0e030d 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java @@ -37,7 +37,7 @@ public EqualityDeleteRowReader( @Override protected CloseableIterator open(FileScanTask task) { SparkDeleteFilter matches = - new SparkDeleteFilter(task.file().path().toString(), task.deletes()); + new SparkDeleteFilter(task.file().path().toString(), task.deletes(), counter()); // schema or rows returned by readers Schema requiredSchema = matches.requiredSchema(); diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java index 3778049cc71a..dfa7e45761e1 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java @@ -32,8 +32,12 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.spark.rdd.InputFileBlockHolder; import org.apache.spark.sql.catalyst.InternalRow; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; class RowDataReader extends BaseRowReader { + private static final Logger LOG = LoggerFactory.getLogger(RowDataReader.class); + RowDataReader( ScanTaskGroup task, Table table, Schema expectedSchema, boolean caseSensitive) { super(table, task, expectedSchema, caseSensitive); @@ -47,7 +51,8 @@ protected Stream> referencedFiles(FileScanTask task) { @Override protected CloseableIterator open(FileScanTask task) { String filePath = task.file().path().toString(); - SparkDeleteFilter deleteFilter = new SparkDeleteFilter(filePath, task.deletes()); + LOG.debug("Opening data file {}", filePath); + SparkDeleteFilter deleteFilter = new SparkDeleteFilter(filePath, task.deletes(), counter()); // schema or rows returned by readers Schema requiredSchema = deleteFilter.requiredSchema(); diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java index f26daa55b2b3..b17c26d6bbbe 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java @@ -38,10 +38,16 @@ import org.apache.iceberg.spark.SparkReadConf; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.SparkUtil; +import org.apache.iceberg.spark.source.metrics.NumDeletes; +import org.apache.iceberg.spark.source.metrics.NumSplits; +import org.apache.iceberg.spark.source.metrics.TaskNumDeletes; +import org.apache.iceberg.spark.source.metrics.TaskNumSplits; import org.apache.iceberg.util.PropertyUtil; import org.apache.spark.broadcast.Broadcast; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.metric.CustomMetric; +import org.apache.spark.sql.connector.metric.CustomTaskMetric; import org.apache.spark.sql.connector.read.Batch; import org.apache.spark.sql.connector.read.InputPartition; import org.apache.spark.sql.connector.read.PartitionReader; @@ -166,6 +172,11 @@ public String description() { return String.format("%s [filters=%s]", table, filters); } + @Override + public CustomMetric[] supportedCustomMetrics() { + return new CustomMetric[] {new NumSplits(), new NumDeletes()}; + } + static class ReaderFactory implements PartitionReaderFactory { private final int batchSize; @@ -198,15 +209,41 @@ public boolean supportColumnarReads(InputPartition partition) { } private static class RowReader extends RowDataReader implements PartitionReader { + private long numSplits; + RowReader(ReadTask task) { super(task.task, task.table(), task.expectedSchema(), task.isCaseSensitive()); + numSplits = task.task.files().size(); + LOG.debug( + "Reading {} file split(s) for table {} using RowReader", numSplits, task.table().name()); + } + + @Override + public CustomTaskMetric[] currentMetricsValues() { + return new CustomTaskMetric[] { + new TaskNumSplits(numSplits), new TaskNumDeletes(counter().get()) + }; } } private static class BatchReader extends BatchDataReader implements PartitionReader { + private long numSplits; + BatchReader(ReadTask task, int batchSize) { super(task.task, task.table(), task.expectedSchema(), task.isCaseSensitive(), batchSize); + numSplits = task.task.files().size(); + LOG.debug( + "Reading {} file split(s) for table {} using BatchReader", + numSplits, + task.table().name()); + } + + @Override + public CustomTaskMetric[] currentMetricsValues() { + return new CustomTaskMetric[] { + new TaskNumSplits(numSplits), new TaskNumDeletes(counter().get()) + }; } } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/metrics/NumDeletes.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/metrics/NumDeletes.java new file mode 100644 index 000000000000..dd65baf3c95c --- /dev/null +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/metrics/NumDeletes.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import java.text.NumberFormat; +import org.apache.spark.sql.connector.metric.CustomMetric; + +public class NumDeletes implements CustomMetric { + + public static final String DISPLAY_STRING = "number of row deletes applied"; + + @Override + public String name() { + return "numDeletes"; + } + + @Override + public String description() { + return DISPLAY_STRING; + } + + @Override + public String aggregateTaskMetrics(long[] taskMetrics) { + long sum = initialValue; + for (int i = 0; i < taskMetrics.length; i++) { + sum += taskMetrics[i]; + } + return NumberFormat.getIntegerInstance().format(sum); + } +} diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/metrics/NumSplits.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/metrics/NumSplits.java new file mode 100644 index 000000000000..96236382b510 --- /dev/null +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/metrics/NumSplits.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import java.text.NumberFormat; +import org.apache.spark.sql.connector.metric.CustomMetric; + +public class NumSplits implements CustomMetric { + + @Override + public String name() { + return "numSplits"; + } + + @Override + public String description() { + return "number of file splits read"; + } + + @Override + public String aggregateTaskMetrics(long[] taskMetrics) { + long sum = initialValue; + for (int i = 0; i < taskMetrics.length; i++) { + sum += taskMetrics[i]; + } + return NumberFormat.getIntegerInstance().format(sum); + } +} diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskNumDeletes.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskNumDeletes.java new file mode 100644 index 000000000000..8c734ba9f022 --- /dev/null +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskNumDeletes.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomTaskMetric; + +public class TaskNumDeletes implements CustomTaskMetric { + private final long value; + + public TaskNumDeletes(long value) { + this.value = value; + } + + @Override + public String name() { + return "numDeletes"; + } + + @Override + public long value() { + return value; + } +} diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskNumSplits.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskNumSplits.java new file mode 100644 index 000000000000..d8cbc4db05bb --- /dev/null +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskNumSplits.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomTaskMetric; + +public class TaskNumSplits implements CustomTaskMetric { + private final long value; + + public TaskNumSplits(long value) { + this.value = value; + } + + @Override + public String name() { + return "numSplits"; + } + + @Override + public long value() { + return value; + } +} diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/SparkSQLExecutionHelper.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/SparkSQLExecutionHelper.java new file mode 100644 index 000000000000..5fed8d305dec --- /dev/null +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/SparkSQLExecutionHelper.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.execution.ui.SQLAppStatusStore; +import org.apache.spark.sql.execution.ui.SQLExecutionUIData; +import org.apache.spark.sql.execution.ui.SQLPlanMetric; +import org.junit.Assert; +import scala.Option; + +public class SparkSQLExecutionHelper { + + private SparkSQLExecutionHelper() {} + + /** + * Finds the value of a specified metric for the last SQL query that was executed. Metric values + * are stored in the `SQLAppStatusStore` as strings. + * + * @param spark SparkSession used to run the SQL query + * @param metricName name of the metric + * @return value of the metric + */ + public static String lastExecutedMetricValue(SparkSession spark, String metricName) { + SQLAppStatusStore statusStore = spark.sharedState().statusStore(); + SQLExecutionUIData lastExecution = statusStore.executionsList().last(); + Option sqlPlanMetric = + lastExecution.metrics().find(metric -> metric.name().equals(metricName)); + Assert.assertTrue( + String.format("Metric '%s' not found in last execution", metricName), + sqlPlanMetric.isDefined()); + long metricId = sqlPlanMetric.get().accumulatorId(); + + // Refresh metricValues, they will remain null until the execution is complete and metrics are + // aggregated + int attempts = 3; + while (lastExecution.metricValues() == null && attempts > 0) { + try { + Thread.sleep(100); + attempts -= 1; + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + lastExecution = statusStore.execution(lastExecution.executionId()).get(); + } + + Assert.assertNotNull("Metric values were not finalized", lastExecution.metricValues()); + String metricValue = lastExecution.metricValues().get(metricId).getOrElse(null); + Assert.assertNotNull(String.format("Metric '%s' was not finalized", metricName), metricValue); + return metricValue; + } +} diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java index 31ec21b3b0fe..1b88d8b134e3 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java @@ -19,6 +19,7 @@ package org.apache.iceberg.spark.source; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS; +import static org.apache.iceberg.spark.source.SparkSQLExecutionHelper.lastExecutedMetricValue; import static org.apache.iceberg.types.Types.NestedField.required; import java.io.IOException; @@ -54,6 +55,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.SparkStructLike; +import org.apache.iceberg.spark.source.metrics.NumDeletes; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.ArrayUtil; import org.apache.iceberg.util.CharSequenceSet; @@ -78,15 +80,22 @@ public class TestSparkReaderDeletes extends DeleteReadTests { private static TestHiveMetastore metastore = null; protected static SparkSession spark = null; protected static HiveCatalog catalog = null; + private final String format; private final boolean vectorized; - public TestSparkReaderDeletes(boolean vectorized) { + public TestSparkReaderDeletes(String format, boolean vectorized) { + this.format = format; this.vectorized = vectorized; } - @Parameterized.Parameters(name = "vectorized = {0}") - public static Object[] parameters() { - return new Object[] {false, true}; + @Parameterized.Parameters(name = "format = {0}, vectorized = {1}") + public static Object[][] parameters() { + return new Object[][] { + new Object[] {"parquet", false}, + new Object[] {"parquet", true}, + new Object[] {"orc", false}, + new Object[] {"avro", false} + }; } @BeforeClass @@ -98,6 +107,7 @@ public static void startMetastoreAndSpark() { spark = SparkSession.builder() .master("local[2]") + .config("spark.appStateStore.asyncTracking.enable", false) .config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic") .config("spark.hadoop." + METASTOREURIS.varname, hiveConf.get(METASTOREURIS.varname)) .enableHiveSupport() @@ -130,7 +140,8 @@ protected Table createTable(String name, Schema schema, PartitionSpec spec) { TableOperations ops = ((BaseTable) table).operations(); TableMetadata meta = ops.current(); ops.commit(meta, meta.upgradeToFormatVersion(2)); - if (vectorized) { + table.updateProperties().set(TableProperties.DEFAULT_FILE_FORMAT, format).commit(); + if (format.equals("parquet") && vectorized) { table .updateProperties() .set(TableProperties.PARQUET_VECTORIZATION_ENABLED, "true") @@ -149,6 +160,15 @@ protected void dropTable(String name) { catalog.dropTable(TableIdentifier.of("default", name)); } + protected boolean countDeletes() { + return true; + } + + @Override + protected long deleteCount() { + return Long.parseLong(lastExecutedMetricValue(spark, NumDeletes.DISPLAY_STRING)); + } + @Override public StructLikeSet rowSet(String name, Table table, String... columns) { return rowSet(name, table.schema().select(columns).asStruct(), columns); @@ -304,6 +324,8 @@ public void testPosDeletesAllRowsInBatch() throws IOException { StructLikeSet actual = rowSet(tableName, table, "*"); Assert.assertEquals("Table should contain expected rows", expected, actual); + long expectedDeletes = 4L; + checkDeleteCount(expectedDeletes); } @Test From 482e84c1126cf743d87cd8a98ad0e1214ffd5e1f Mon Sep 17 00:00:00 2001 From: Wing Yew Poon Date: Mon, 22 Aug 2022 12:08:22 -0700 Subject: [PATCH 3/8] Address some feedback from Yufei Gu. Remove debug logging from ColumnarBatchReader. Explicitly disable vectorization for columnar file formats (orc, parquet) in the table properties when not testing the vectorized case. --- .../iceberg/spark/data/vectorized/ColumnarBatchReader.java | 5 ----- .../apache/iceberg/spark/source/TestSparkReaderDeletes.java | 4 +++- .../iceberg/spark/data/vectorized/ColumnarBatchReader.java | 5 ----- .../apache/iceberg/spark/source/TestSparkReaderDeletes.java | 4 +++- 4 files changed, 6 insertions(+), 12 deletions(-) diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java index b67da43773cd..71118ef2153e 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java @@ -35,8 +35,6 @@ import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.vectorized.ColumnVector; import org.apache.spark.sql.vectorized.ColumnarBatch; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * {@link VectorizedReader} that returns Spark's {@link ColumnarBatch} to support Spark's vectorized @@ -44,7 +42,6 @@ * populated via delegated read calls to {@linkplain VectorizedArrowReader VectorReader(s)}. */ public class ColumnarBatchReader extends BaseBatchReader { - private static final Logger LOG = LoggerFactory.getLogger(ColumnarBatchReader.class); private final boolean hasIsDeletedColumn; private DeleteFilter deletes = null; private long rowStartPosInBatch = 0; @@ -173,7 +170,6 @@ Pair posDelRowIdMapping() { * @return the mapping array and the new num of rows in a batch, null if no row is deleted */ Pair buildPosDelRowIdMapping(PositionDeleteIndex deletedRowPositions) { - LOG.debug("Building row id mapping from positional deletes"); if (deletedRowPositions == null) { return null; } @@ -223,7 +219,6 @@ int[] initEqDeleteRowIdMapping() { * @param columnarBatch the {@link ColumnarBatch} to apply the equality delete */ void applyEqDelete(ColumnarBatch columnarBatch) { - LOG.debug("Applying equality deletes to row id mapping"); Iterator it = columnarBatch.rowIterator(); int rowId = 0; int currentRowId = 0; diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java index 1b88d8b134e3..2a7b740e6d4f 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java @@ -149,8 +149,10 @@ protected Table createTable(String name, Schema schema, PartitionSpec spec) { TableProperties.PARQUET_BATCH_SIZE, "4") // split 7 records to two batches to cover more code paths .commit(); - } else { + } else if (format.equals("parquet")) { // in this case, non-vectorized table.updateProperties().set(TableProperties.PARQUET_VECTORIZATION_ENABLED, "false").commit(); + } else if (format.equals("orc")) { // we only have non-vectorized for orc in our parameters + table.updateProperties().set(TableProperties.ORC_VECTORIZATION_ENABLED, "false").commit(); } return table; } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java index b67da43773cd..71118ef2153e 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java @@ -35,8 +35,6 @@ import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.vectorized.ColumnVector; import org.apache.spark.sql.vectorized.ColumnarBatch; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * {@link VectorizedReader} that returns Spark's {@link ColumnarBatch} to support Spark's vectorized @@ -44,7 +42,6 @@ * populated via delegated read calls to {@linkplain VectorizedArrowReader VectorReader(s)}. */ public class ColumnarBatchReader extends BaseBatchReader { - private static final Logger LOG = LoggerFactory.getLogger(ColumnarBatchReader.class); private final boolean hasIsDeletedColumn; private DeleteFilter deletes = null; private long rowStartPosInBatch = 0; @@ -173,7 +170,6 @@ Pair posDelRowIdMapping() { * @return the mapping array and the new num of rows in a batch, null if no row is deleted */ Pair buildPosDelRowIdMapping(PositionDeleteIndex deletedRowPositions) { - LOG.debug("Building row id mapping from positional deletes"); if (deletedRowPositions == null) { return null; } @@ -223,7 +219,6 @@ int[] initEqDeleteRowIdMapping() { * @param columnarBatch the {@link ColumnarBatch} to apply the equality delete */ void applyEqDelete(ColumnarBatch columnarBatch) { - LOG.debug("Applying equality deletes to row id mapping"); Iterator it = columnarBatch.rowIterator(); int rowId = 0; int currentRowId = 0; diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java index 1b88d8b134e3..2a7b740e6d4f 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java @@ -149,8 +149,10 @@ protected Table createTable(String name, Schema schema, PartitionSpec spec) { TableProperties.PARQUET_BATCH_SIZE, "4") // split 7 records to two batches to cover more code paths .commit(); - } else { + } else if (format.equals("parquet")) { // in this case, non-vectorized table.updateProperties().set(TableProperties.PARQUET_VECTORIZATION_ENABLED, "false").commit(); + } else if (format.equals("orc")) { // we only have non-vectorized for orc in our parameters + table.updateProperties().set(TableProperties.ORC_VECTORIZATION_ENABLED, "false").commit(); } return table; } From 5837cd65c12b1e68a880a8f107cc414f454c4a74 Mon Sep 17 00:00:00 2001 From: Wing Yew Poon Date: Mon, 22 Aug 2022 17:03:17 -0700 Subject: [PATCH 4/8] Count deletes applied in the case where the _deleted metadata column is present. Added tests for the metric for this case too. --- .../spark/data/vectorized/ColumnarBatchReader.java | 10 ++++++---- .../org/apache/iceberg/spark/source/BaseReader.java | 5 ++++- .../iceberg/spark/source/TestSparkReaderDeletes.java | 8 ++++++++ .../spark/data/vectorized/ColumnarBatchReader.java | 10 ++++++---- .../org/apache/iceberg/spark/source/BaseReader.java | 5 ++++- .../iceberg/spark/source/TestSparkReaderDeletes.java | 8 ++++++++ 6 files changed, 36 insertions(+), 10 deletions(-) diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java index 71118ef2153e..b1c7d4fa6ca1 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java @@ -181,9 +181,10 @@ Pair buildPosDelRowIdMapping(PositionDeleteIndex deletedRowPosit if (!deletedRowPositions.isDeleted(originalRowId + rowStartPosInBatch)) { posDelRowIdMapping[currentRowId] = originalRowId; currentRowId++; - } else if (hasIsDeletedColumn) { - isDeleted[originalRowId] = true; } else { + if (hasIsDeletedColumn) { + isDeleted[originalRowId] = true; + } deletes.incrementDeleteCount(); } originalRowId++; @@ -229,9 +230,10 @@ void applyEqDelete(ColumnarBatch columnarBatch) { // skip deleted rows by pointing to the next undeleted row Id rowIdMapping[currentRowId] = rowIdMapping[rowId]; currentRowId++; - } else if (hasIsDeletedColumn) { - isDeleted[rowIdMapping[rowId]] = true; } else { + if (hasIsDeletedColumn) { + isDeleted[rowIdMapping[rowId]] = true; + } deletes.incrementDeleteCount(); } diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java index 6e2219a8e329..2333cd734bbe 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java @@ -268,7 +268,10 @@ protected InputFile getInputFile(String location) { @Override protected void markRowDeleted(InternalRow row) { - row.setBoolean(columnIsDeletedPosition(), true); + if (!row.getBoolean(columnIsDeletedPosition())) { + row.setBoolean(columnIsDeletedPosition(), true); + counter().increment(); + } } } } diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java index 2a7b740e6d4f..d6c1acef78cf 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java @@ -357,6 +357,8 @@ public void testPosDeletesWithDeletedColumn() throws IOException { rowSet(tableName, PROJECTION_SCHEMA.asStruct(), "id", "data", "_deleted"); Assert.assertEquals("Table should contain expected row", expected, actual); + long expectedDeletes = 4L; + checkDeleteCount(expectedDeletes); } @Test @@ -386,6 +388,8 @@ public void testEqualityDeleteWithDeletedColumn() throws IOException { rowSet(tableName, PROJECTION_SCHEMA.asStruct(), "id", "data", "_deleted"); Assert.assertEquals("Table should contain expected row", expected, actual); + long expectedDeletes = 3L; + checkDeleteCount(expectedDeletes); } @Test @@ -429,6 +433,8 @@ public void testMixedPosAndEqDeletesWithDeletedColumn() throws IOException { rowSet(tableName, PROJECTION_SCHEMA.asStruct(), "id", "data", "_deleted"); Assert.assertEquals("Table should contain expected row", expected, actual); + long expectedDeletes = 4L; + checkDeleteCount(expectedDeletes); } @Test @@ -501,6 +507,8 @@ public void testIsDeletedColumnWithoutDeleteFile() { StructLikeSet actual = rowSet(tableName, PROJECTION_SCHEMA.asStruct(), "id", "data", "_deleted"); Assert.assertEquals("Table should contain expected row", expected, actual); + long expectedDeletes = 0L; + checkDeleteCount(expectedDeletes); } private static final Schema PROJECTION_SCHEMA = diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java index 71118ef2153e..b1c7d4fa6ca1 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java @@ -181,9 +181,10 @@ Pair buildPosDelRowIdMapping(PositionDeleteIndex deletedRowPosit if (!deletedRowPositions.isDeleted(originalRowId + rowStartPosInBatch)) { posDelRowIdMapping[currentRowId] = originalRowId; currentRowId++; - } else if (hasIsDeletedColumn) { - isDeleted[originalRowId] = true; } else { + if (hasIsDeletedColumn) { + isDeleted[originalRowId] = true; + } deletes.incrementDeleteCount(); } originalRowId++; @@ -229,9 +230,10 @@ void applyEqDelete(ColumnarBatch columnarBatch) { // skip deleted rows by pointing to the next undeleted row Id rowIdMapping[currentRowId] = rowIdMapping[rowId]; currentRowId++; - } else if (hasIsDeletedColumn) { - isDeleted[rowIdMapping[rowId]] = true; } else { + if (hasIsDeletedColumn) { + isDeleted[rowIdMapping[rowId]] = true; + } deletes.incrementDeleteCount(); } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java index 6e2219a8e329..2333cd734bbe 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java @@ -268,7 +268,10 @@ protected InputFile getInputFile(String location) { @Override protected void markRowDeleted(InternalRow row) { - row.setBoolean(columnIsDeletedPosition(), true); + if (!row.getBoolean(columnIsDeletedPosition())) { + row.setBoolean(columnIsDeletedPosition(), true); + counter().increment(); + } } } } diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java index 2a7b740e6d4f..d6c1acef78cf 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java @@ -357,6 +357,8 @@ public void testPosDeletesWithDeletedColumn() throws IOException { rowSet(tableName, PROJECTION_SCHEMA.asStruct(), "id", "data", "_deleted"); Assert.assertEquals("Table should contain expected row", expected, actual); + long expectedDeletes = 4L; + checkDeleteCount(expectedDeletes); } @Test @@ -386,6 +388,8 @@ public void testEqualityDeleteWithDeletedColumn() throws IOException { rowSet(tableName, PROJECTION_SCHEMA.asStruct(), "id", "data", "_deleted"); Assert.assertEquals("Table should contain expected row", expected, actual); + long expectedDeletes = 3L; + checkDeleteCount(expectedDeletes); } @Test @@ -429,6 +433,8 @@ public void testMixedPosAndEqDeletesWithDeletedColumn() throws IOException { rowSet(tableName, PROJECTION_SCHEMA.asStruct(), "id", "data", "_deleted"); Assert.assertEquals("Table should contain expected row", expected, actual); + long expectedDeletes = 4L; + checkDeleteCount(expectedDeletes); } @Test @@ -501,6 +507,8 @@ public void testIsDeletedColumnWithoutDeleteFile() { StructLikeSet actual = rowSet(tableName, PROJECTION_SCHEMA.asStruct(), "id", "data", "_deleted"); Assert.assertEquals("Table should contain expected row", expected, actual); + long expectedDeletes = 0L; + checkDeleteCount(expectedDeletes); } private static final Schema PROJECTION_SCHEMA = From b91a311301f11abd224095a85136c837385200b6 Mon Sep 17 00:00:00 2001 From: Wing Yew Poon Date: Fri, 26 Aug 2022 12:00:20 -0700 Subject: [PATCH 5/8] Address nits. Simplify setting of vectorization properties. Call checkDeleteCount directly. --- .../apache/iceberg/data/DeleteReadTests.java | 29 +++++-------- .../spark/source/TestSparkReaderDeletes.java | 41 +++++++++---------- .../spark/source/TestSparkReaderDeletes.java | 41 +++++++++---------- 3 files changed, 48 insertions(+), 63 deletions(-) diff --git a/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java b/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java index cdd0268e3977..eb0d96e150b1 100644 --- a/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java +++ b/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java @@ -212,8 +212,7 @@ public void testEqualityDeletes() throws IOException { StructLikeSet actual = rowSet(tableName, table, "*"); Assert.assertEquals("Table should contain expected rows", expected, actual); - long expectedDeletes = 3L; - checkDeleteCount(expectedDeletes); + checkDeleteCount(3L); } @Test @@ -262,8 +261,7 @@ public void testEqualityDateDeletes() throws IOException { StructLikeSet actual = rowSet(dateTableName, dateTable, "*"); Assert.assertEquals("Table should contain expected rows", expected, actual); - long expectedDeletes = 3L; - checkDeleteCount(expectedDeletes); + checkDeleteCount(3L); } @Test @@ -295,8 +293,7 @@ public void testEqualityDeletesWithRequiredEqColumn() throws IOException { "Table should contain expected rows", expected, selectColumns(actual, "id")); } - long expectedDeletes = 3L; - checkDeleteCount(expectedDeletes); + checkDeleteCount(3L); } @Test @@ -331,9 +328,8 @@ public void testEqualityDeletesSpanningMultipleDataFiles() throws IOException { StructLikeSet actual = rowSet(tableName, table, "*"); Assert.assertEquals("Table should contain expected rows", expected, actual); - long expectedDeletes = - 7L; // 3 deletes in the first data file and 4 deletes in the second data file - checkDeleteCount(expectedDeletes); + // 3 deletes in the first data file and 4 deletes in the second data file + checkDeleteCount(7L); } @Test @@ -358,8 +354,7 @@ public void testPositionDeletes() throws IOException { StructLikeSet actual = rowSet(tableName, table, "*"); Assert.assertEquals("Table should contain expected rows", expected, actual); - long expectedDeletes = 3L; - checkDeleteCount(expectedDeletes); + checkDeleteCount(3L); } @Test @@ -397,8 +392,7 @@ public void testMultiplePosDeleteFiles() throws IOException { StructLikeSet actual = rowSet(tableName, table, "*"); Assert.assertEquals("Table should contain expected rows", expected, actual); - long expectedDeletes = 3L; - checkDeleteCount(expectedDeletes); + checkDeleteCount(3L); } @Test @@ -436,8 +430,7 @@ public void testMixedPositionAndEqualityDeletes() throws IOException { StructLikeSet actual = rowSet(tableName, table, "*"); Assert.assertEquals("Table should contain expected rows", expected, actual); - long expectedDeletes = 4L; - checkDeleteCount(expectedDeletes); + checkDeleteCount(4L); } @Test @@ -473,8 +466,7 @@ public void testMultipleEqualityDeleteSchemas() throws IOException { StructLikeSet actual = rowSet(tableName, table, "*"); Assert.assertEquals("Table should contain expected rows", expected, actual); - long expectedDeletes = 4L; - checkDeleteCount(expectedDeletes); + checkDeleteCount(4L); } @Test @@ -511,8 +503,7 @@ public void testEqualityDeleteByNull() throws IOException { StructLikeSet actual = rowSet(tableName, table, "*"); Assert.assertEquals("Table should contain expected rows", expected, actual); - long expectedDeletes = 1L; - checkDeleteCount(expectedDeletes); + checkDeleteCount(1L); } private StructLikeSet selectColumns(StructLikeSet rows, String... columns) { diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java index d6c1acef78cf..4924f07bf198 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java @@ -141,18 +141,20 @@ protected Table createTable(String name, Schema schema, PartitionSpec spec) { TableMetadata meta = ops.current(); ops.commit(meta, meta.upgradeToFormatVersion(2)); table.updateProperties().set(TableProperties.DEFAULT_FILE_FORMAT, format).commit(); - if (format.equals("parquet") && vectorized) { - table - .updateProperties() - .set(TableProperties.PARQUET_VECTORIZATION_ENABLED, "true") - .set( - TableProperties.PARQUET_BATCH_SIZE, - "4") // split 7 records to two batches to cover more code paths - .commit(); - } else if (format.equals("parquet")) { // in this case, non-vectorized - table.updateProperties().set(TableProperties.PARQUET_VECTORIZATION_ENABLED, "false").commit(); - } else if (format.equals("orc")) { // we only have non-vectorized for orc in our parameters - table.updateProperties().set(TableProperties.ORC_VECTORIZATION_ENABLED, "false").commit(); + if (format.equals("parquet") || format.equals("orc")) { + String vectorizationEnabled = + format.equals("parquet") + ? TableProperties.PARQUET_VECTORIZATION_ENABLED + : TableProperties.ORC_VECTORIZATION_ENABLED; + String batchSize = + format.equals("parquet") + ? TableProperties.PARQUET_BATCH_SIZE + : TableProperties.ORC_BATCH_SIZE; + table.updateProperties().set(vectorizationEnabled, String.valueOf(vectorized)).commit(); + if (vectorized) { + // split 7 records to two batches to cover more code paths + table.updateProperties().set(batchSize, "4").commit(); + } } return table; } @@ -326,8 +328,7 @@ public void testPosDeletesAllRowsInBatch() throws IOException { StructLikeSet actual = rowSet(tableName, table, "*"); Assert.assertEquals("Table should contain expected rows", expected, actual); - long expectedDeletes = 4L; - checkDeleteCount(expectedDeletes); + checkDeleteCount(4L); } @Test @@ -357,8 +358,7 @@ public void testPosDeletesWithDeletedColumn() throws IOException { rowSet(tableName, PROJECTION_SCHEMA.asStruct(), "id", "data", "_deleted"); Assert.assertEquals("Table should contain expected row", expected, actual); - long expectedDeletes = 4L; - checkDeleteCount(expectedDeletes); + checkDeleteCount(4L); } @Test @@ -388,8 +388,7 @@ public void testEqualityDeleteWithDeletedColumn() throws IOException { rowSet(tableName, PROJECTION_SCHEMA.asStruct(), "id", "data", "_deleted"); Assert.assertEquals("Table should contain expected row", expected, actual); - long expectedDeletes = 3L; - checkDeleteCount(expectedDeletes); + checkDeleteCount(3L); } @Test @@ -433,8 +432,7 @@ public void testMixedPosAndEqDeletesWithDeletedColumn() throws IOException { rowSet(tableName, PROJECTION_SCHEMA.asStruct(), "id", "data", "_deleted"); Assert.assertEquals("Table should contain expected row", expected, actual); - long expectedDeletes = 4L; - checkDeleteCount(expectedDeletes); + checkDeleteCount(4L); } @Test @@ -507,8 +505,7 @@ public void testIsDeletedColumnWithoutDeleteFile() { StructLikeSet actual = rowSet(tableName, PROJECTION_SCHEMA.asStruct(), "id", "data", "_deleted"); Assert.assertEquals("Table should contain expected row", expected, actual); - long expectedDeletes = 0L; - checkDeleteCount(expectedDeletes); + checkDeleteCount(0L); } private static final Schema PROJECTION_SCHEMA = diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java index d6c1acef78cf..4924f07bf198 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java @@ -141,18 +141,20 @@ protected Table createTable(String name, Schema schema, PartitionSpec spec) { TableMetadata meta = ops.current(); ops.commit(meta, meta.upgradeToFormatVersion(2)); table.updateProperties().set(TableProperties.DEFAULT_FILE_FORMAT, format).commit(); - if (format.equals("parquet") && vectorized) { - table - .updateProperties() - .set(TableProperties.PARQUET_VECTORIZATION_ENABLED, "true") - .set( - TableProperties.PARQUET_BATCH_SIZE, - "4") // split 7 records to two batches to cover more code paths - .commit(); - } else if (format.equals("parquet")) { // in this case, non-vectorized - table.updateProperties().set(TableProperties.PARQUET_VECTORIZATION_ENABLED, "false").commit(); - } else if (format.equals("orc")) { // we only have non-vectorized for orc in our parameters - table.updateProperties().set(TableProperties.ORC_VECTORIZATION_ENABLED, "false").commit(); + if (format.equals("parquet") || format.equals("orc")) { + String vectorizationEnabled = + format.equals("parquet") + ? TableProperties.PARQUET_VECTORIZATION_ENABLED + : TableProperties.ORC_VECTORIZATION_ENABLED; + String batchSize = + format.equals("parquet") + ? TableProperties.PARQUET_BATCH_SIZE + : TableProperties.ORC_BATCH_SIZE; + table.updateProperties().set(vectorizationEnabled, String.valueOf(vectorized)).commit(); + if (vectorized) { + // split 7 records to two batches to cover more code paths + table.updateProperties().set(batchSize, "4").commit(); + } } return table; } @@ -326,8 +328,7 @@ public void testPosDeletesAllRowsInBatch() throws IOException { StructLikeSet actual = rowSet(tableName, table, "*"); Assert.assertEquals("Table should contain expected rows", expected, actual); - long expectedDeletes = 4L; - checkDeleteCount(expectedDeletes); + checkDeleteCount(4L); } @Test @@ -357,8 +358,7 @@ public void testPosDeletesWithDeletedColumn() throws IOException { rowSet(tableName, PROJECTION_SCHEMA.asStruct(), "id", "data", "_deleted"); Assert.assertEquals("Table should contain expected row", expected, actual); - long expectedDeletes = 4L; - checkDeleteCount(expectedDeletes); + checkDeleteCount(4L); } @Test @@ -388,8 +388,7 @@ public void testEqualityDeleteWithDeletedColumn() throws IOException { rowSet(tableName, PROJECTION_SCHEMA.asStruct(), "id", "data", "_deleted"); Assert.assertEquals("Table should contain expected row", expected, actual); - long expectedDeletes = 3L; - checkDeleteCount(expectedDeletes); + checkDeleteCount(3L); } @Test @@ -433,8 +432,7 @@ public void testMixedPosAndEqDeletesWithDeletedColumn() throws IOException { rowSet(tableName, PROJECTION_SCHEMA.asStruct(), "id", "data", "_deleted"); Assert.assertEquals("Table should contain expected row", expected, actual); - long expectedDeletes = 4L; - checkDeleteCount(expectedDeletes); + checkDeleteCount(4L); } @Test @@ -507,8 +505,7 @@ public void testIsDeletedColumnWithoutDeleteFile() { StructLikeSet actual = rowSet(tableName, PROJECTION_SCHEMA.asStruct(), "id", "data", "_deleted"); Assert.assertEquals("Table should contain expected row", expected, actual); - long expectedDeletes = 0L; - checkDeleteCount(expectedDeletes); + checkDeleteCount(0L); } private static final Schema PROJECTION_SCHEMA = From e95d9ef08ce3a6358e8399461f96408332bc392a Mon Sep 17 00:00:00 2001 From: Wing Yew Poon Date: Sat, 27 Aug 2022 12:16:30 -0700 Subject: [PATCH 6/8] Minor cleanup. Improve comments for a DeleteReadTests test case. Add blank line after if clause. --- .../test/java/org/apache/iceberg/data/DeleteReadTests.java | 6 +++--- .../iceberg/spark/data/vectorized/ColumnarBatchReader.java | 3 +++ .../iceberg/spark/data/vectorized/ColumnarBatchReader.java | 3 +++ 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java b/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java index eb0d96e150b1..8d8c5e531984 100644 --- a/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java +++ b/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java @@ -305,8 +305,9 @@ public void testEqualityDeletesSpanningMultipleDataFiles() throws IOException { this.dataFile = FileHelpers.writeDataFile(table, Files.localOutput(temp.newFile()), Row.of(0), records); + // At this point, the table has two data files, with 7 and 8 rows respectively, of which all but + // one are in duplicate. table.newAppend().appendFile(dataFile).commit(); - // At this point, the table has 7 + 8 = 15 rows, of which all but one are in duplicate. Schema deleteRowSchema = table.schema().select("data"); Record dataDelete = GenericRecord.create(deleteRowSchema); @@ -321,14 +322,13 @@ public void testEqualityDeletesSpanningMultipleDataFiles() throws IOException { FileHelpers.writeDeleteFile( table, Files.localOutput(temp.newFile()), Row.of(0), dataDeletes, deleteRowSchema); + // At this point, 3 rows in the first data file and 4 rows in the second data file are deleted. table.newRowDelta().addDeletes(eqDeletes).commit(); - // At ths point, the table has (7 - 3) + (8 - 4) = 8 rows. 7 rows in all are deleted. StructLikeSet expected = rowSetWithoutIds(table, records, 29, 89, 122, 144); StructLikeSet actual = rowSet(tableName, table, "*"); Assert.assertEquals("Table should contain expected rows", expected, actual); - // 3 deletes in the first data file and 4 deletes in the second data file checkDeleteCount(7L); } diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java index b1c7d4fa6ca1..f07d8c545e35 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java @@ -185,6 +185,7 @@ Pair buildPosDelRowIdMapping(PositionDeleteIndex deletedRowPosit if (hasIsDeletedColumn) { isDeleted[originalRowId] = true; } + deletes.incrementDeleteCount(); } originalRowId++; @@ -206,6 +207,7 @@ int[] initEqDeleteRowIdMapping() { eqDeleteRowIdMapping[i] = i; } } + return eqDeleteRowIdMapping; } @@ -234,6 +236,7 @@ void applyEqDelete(ColumnarBatch columnarBatch) { if (hasIsDeletedColumn) { isDeleted[rowIdMapping[rowId]] = true; } + deletes.incrementDeleteCount(); } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java index b1c7d4fa6ca1..f07d8c545e35 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java @@ -185,6 +185,7 @@ Pair buildPosDelRowIdMapping(PositionDeleteIndex deletedRowPosit if (hasIsDeletedColumn) { isDeleted[originalRowId] = true; } + deletes.incrementDeleteCount(); } originalRowId++; @@ -206,6 +207,7 @@ int[] initEqDeleteRowIdMapping() { eqDeleteRowIdMapping[i] = i; } } + return eqDeleteRowIdMapping; } @@ -234,6 +236,7 @@ void applyEqDelete(ColumnarBatch columnarBatch) { if (hasIsDeletedColumn) { isDeleted[rowIdMapping[rowId]] = true; } + deletes.incrementDeleteCount(); } From 1d68dbc38049c81fd3ae948446dcb40fea2858bb Mon Sep 17 00:00:00 2001 From: Wing Yew Poon Date: Thu, 1 Sep 2022 17:31:08 -0700 Subject: [PATCH 7/8] Address additional nits. --- .../org/apache/iceberg/deletes/Deletes.java | 17 ++++++++++++++++- .../spark/source/metrics/NumDeletes.java | 1 + .../iceberg/spark/source/metrics/NumSplits.java | 1 + .../spark/source/SparkSQLExecutionHelper.java | 3 ++- .../spark/source/metrics/NumDeletes.java | 1 + .../iceberg/spark/source/metrics/NumSplits.java | 1 + .../spark/source/SparkSQLExecutionHelper.java | 3 ++- 7 files changed, 24 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java index 6485ee05744c..beec06e045d3 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java +++ b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java @@ -62,7 +62,14 @@ public static CloseableIterable filter( return equalityFilter.filter(rows); } - /** Returns the same rows that are input, while marking the deleted ones. */ + /** + * Returns the same rows that are input, while marking the deleted ones. + * + * @param rows the rows to process + * @param isDeleted a predicate that determines if a row is deleted + * @param deleteMarker a function that marks a row as deleted + * @return the processed rows + */ public static CloseableIterable markDeleted( CloseableIterable rows, Predicate isDeleted, Consumer deleteMarker) { return CloseableIterable.transform( @@ -71,12 +78,18 @@ public static CloseableIterable markDeleted( if (isDeleted.test(row)) { deleteMarker.accept(row); } + return row; }); } /** * Returns the remaining rows (the ones that are not deleted), while counting the deleted ones. + * + * @param rows the rows to process + * @param isDeleted a predicate that determines if a row is deleted + * @param counter a counter that counts deleted rows + * @return the processed rows */ public static CloseableIterable filterDeleted( CloseableIterable rows, Predicate isDeleted, DeleteCounter counter) { @@ -88,6 +101,7 @@ protected boolean shouldKeep(T item) { if (deleted) { counter.increment(); } + return !deleted; } }; @@ -259,6 +273,7 @@ protected boolean shouldKeep(T item) { if (deleted) { counter.increment(); } + return !deleted; } }; diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/metrics/NumDeletes.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/metrics/NumDeletes.java index dd65baf3c95c..5654ae3ed514 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/metrics/NumDeletes.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/metrics/NumDeletes.java @@ -41,6 +41,7 @@ public String aggregateTaskMetrics(long[] taskMetrics) { for (int i = 0; i < taskMetrics.length; i++) { sum += taskMetrics[i]; } + return NumberFormat.getIntegerInstance().format(sum); } } diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/metrics/NumSplits.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/metrics/NumSplits.java index 96236382b510..b4f0fc7a4462 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/metrics/NumSplits.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/metrics/NumSplits.java @@ -39,6 +39,7 @@ public String aggregateTaskMetrics(long[] taskMetrics) { for (int i = 0; i < taskMetrics.length; i++) { sum += taskMetrics[i]; } + return NumberFormat.getIntegerInstance().format(sum); } } diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/SparkSQLExecutionHelper.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/SparkSQLExecutionHelper.java index 5fed8d305dec..3b350bc91e72 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/SparkSQLExecutionHelper.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/SparkSQLExecutionHelper.java @@ -53,10 +53,11 @@ public static String lastExecutedMetricValue(SparkSession spark, String metricNa while (lastExecution.metricValues() == null && attempts > 0) { try { Thread.sleep(100); - attempts -= 1; + attempts--; } catch (InterruptedException e) { throw new RuntimeException(e); } + lastExecution = statusStore.execution(lastExecution.executionId()).get(); } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/metrics/NumDeletes.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/metrics/NumDeletes.java index dd65baf3c95c..5654ae3ed514 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/metrics/NumDeletes.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/metrics/NumDeletes.java @@ -41,6 +41,7 @@ public String aggregateTaskMetrics(long[] taskMetrics) { for (int i = 0; i < taskMetrics.length; i++) { sum += taskMetrics[i]; } + return NumberFormat.getIntegerInstance().format(sum); } } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/metrics/NumSplits.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/metrics/NumSplits.java index 96236382b510..b4f0fc7a4462 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/metrics/NumSplits.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/metrics/NumSplits.java @@ -39,6 +39,7 @@ public String aggregateTaskMetrics(long[] taskMetrics) { for (int i = 0; i < taskMetrics.length; i++) { sum += taskMetrics[i]; } + return NumberFormat.getIntegerInstance().format(sum); } } diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/SparkSQLExecutionHelper.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/SparkSQLExecutionHelper.java index 5fed8d305dec..3b350bc91e72 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/SparkSQLExecutionHelper.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/SparkSQLExecutionHelper.java @@ -53,10 +53,11 @@ public static String lastExecutedMetricValue(SparkSession spark, String metricNa while (lastExecution.metricValues() == null && attempts > 0) { try { Thread.sleep(100); - attempts -= 1; + attempts--; } catch (InterruptedException e) { throw new RuntimeException(e); } + lastExecution = statusStore.execution(lastExecution.executionId()).get(); } From 096ff8446b91d181ec9e3bb0e6f48ced27806a28 Mon Sep 17 00:00:00 2001 From: Wing Yew Poon Date: Thu, 1 Sep 2022 18:17:06 -0700 Subject: [PATCH 8/8] Update ChangelogRowReader following rebase on master. --- .../org/apache/iceberg/spark/source/ChangelogRowReader.java | 4 ++-- .../org/apache/iceberg/spark/source/ChangelogRowReader.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java index 82ab8f360ef0..20f0893bcca3 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java @@ -91,13 +91,13 @@ private CloseableIterable openChangelogScanTask(ChangelogScanTask t CloseableIterable openAddedRowsScanTask(AddedRowsScanTask task) { String filePath = task.file().path().toString(); - SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, task.deletes()); + SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, task.deletes(), counter()); return deletes.filter(rows(task, deletes.requiredSchema())); } private CloseableIterable openDeletedDataFileScanTask(DeletedDataFileScanTask task) { String filePath = task.file().path().toString(); - SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, task.existingDeletes()); + SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, task.existingDeletes(), counter()); return deletes.filter(rows(task, deletes.requiredSchema())); } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java index 82ab8f360ef0..20f0893bcca3 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java @@ -91,13 +91,13 @@ private CloseableIterable openChangelogScanTask(ChangelogScanTask t CloseableIterable openAddedRowsScanTask(AddedRowsScanTask task) { String filePath = task.file().path().toString(); - SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, task.deletes()); + SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, task.deletes(), counter()); return deletes.filter(rows(task, deletes.requiredSchema())); } private CloseableIterable openDeletedDataFileScanTask(DeletedDataFileScanTask task) { String filePath = task.file().path().toString(); - SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, task.existingDeletes()); + SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, task.existingDeletes(), counter()); return deletes.filter(rows(task, deletes.requiredSchema())); }