diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java index 45bf3cfcc86a..a6f02bddcb9e 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java @@ -79,13 +79,8 @@ private CloseableIterable newParquetIterable( Expression residual, Map idToConstant, SparkDeleteFilter deleteFilter) { - // get required schema for filtering out equality-delete rows in case equality-delete uses - // columns are - // not selected. - Schema requiredSchema = - deleteFilter != null && deleteFilter.hasEqDeletes() - ? deleteFilter.requiredSchema() - : expectedSchema(); + // get required schema if there are deletes + Schema requiredSchema = deleteFilter != null ? deleteFilter.requiredSchema() : expectedSchema(); return Parquet.read(inputFile) .project(requiredSchema) diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java index 4924f07bf198..d1d85790868e 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java @@ -22,13 +22,18 @@ import static org.apache.iceberg.spark.source.SparkSQLExecutionHelper.lastExecutedMetricValue; import static org.apache.iceberg.types.Types.NestedField.required; +import java.io.File; import java.io.IOException; import java.util.List; import java.util.Set; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.iceberg.BaseTable; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.Files; import org.apache.iceberg.MetadataColumns; @@ -50,11 +55,16 @@ import org.apache.iceberg.hive.HiveCatalog; import org.apache.iceberg.hive.TestHiveMetastore; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.parquet.ParquetSchemaUtil; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.SparkStructLike; +import org.apache.iceberg.spark.data.RandomData; +import org.apache.iceberg.spark.data.SparkParquetWriters; import org.apache.iceberg.spark.source.metrics.NumDeletes; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.ArrayUtil; @@ -62,13 +72,18 @@ import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.StructLikeSet; import org.apache.iceberg.util.TableScanUtil; +import org.apache.parquet.hadoop.ParquetFileWriter; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.internal.SQLConf; +import org.apache.spark.sql.types.StructType; import org.jetbrains.annotations.NotNull; +import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; +import org.junit.Assume; import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; @@ -134,6 +149,13 @@ public static void stopMetastoreAndSpark() throws Exception { spark = null; } + @After + @Override + public void cleanup() throws IOException { + super.cleanup(); + dropTable("test3"); + } + @Override protected Table createTable(String name, Schema schema, PartitionSpec spec) { Table table = catalog.createTable(TableIdentifier.of("default", name), schema); @@ -508,6 +530,75 @@ public void testIsDeletedColumnWithoutDeleteFile() { checkDeleteCount(0L); } + @Test + public void testPosDeletesOnParquetFileWithMultipleRowGroups() throws IOException { + Assume.assumeTrue(format.equals("parquet")); + + String tblName = "test3"; + Table tbl = createTable(tblName, SCHEMA, PartitionSpec.unpartitioned()); + + List fileSplits = Lists.newArrayList(); + StructType sparkSchema = SparkSchemaUtil.convert(SCHEMA); + Configuration conf = new Configuration(); + File testFile = temp.newFile(); + Assert.assertTrue("Delete should succeed", testFile.delete()); + Path testFilePath = new Path(testFile.getAbsolutePath()); + + // Write a Parquet file with more than one row group + ParquetFileWriter parquetFileWriter = + new ParquetFileWriter(conf, ParquetSchemaUtil.convert(SCHEMA, "test3Schema"), testFilePath); + parquetFileWriter.start(); + for (int i = 0; i < 2; i += 1) { + File split = temp.newFile(); + Assert.assertTrue("Delete should succeed", split.delete()); + Path splitPath = new Path(split.getAbsolutePath()); + fileSplits.add(splitPath); + try (FileAppender writer = + Parquet.write(Files.localOutput(split)) + .createWriterFunc(msgType -> SparkParquetWriters.buildWriter(sparkSchema, msgType)) + .schema(SCHEMA) + .overwrite() + .build()) { + Iterable records = RandomData.generateSpark(SCHEMA, 100, 34 * i + 37); + writer.addAll(records); + } + parquetFileWriter.appendFile( + org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(splitPath, conf)); + } + parquetFileWriter.end( + ParquetFileWriter.mergeMetadataFiles(fileSplits, conf) + .getFileMetaData() + .getKeyValueMetaData()); + + // Add the file to the table + DataFile dataFile = + DataFiles.builder(PartitionSpec.unpartitioned()) + .withInputFile(org.apache.iceberg.hadoop.HadoopInputFile.fromPath(testFilePath, conf)) + .withFormat("parquet") + .withRecordCount(200) + .build(); + tbl.newAppend().appendFile(dataFile).commit(); + + // Add positional deletes to the table + List> deletes = + Lists.newArrayList( + Pair.of(dataFile.path(), 97L), + Pair.of(dataFile.path(), 98L), + Pair.of(dataFile.path(), 99L), + Pair.of(dataFile.path(), 101L), + Pair.of(dataFile.path(), 103L), + Pair.of(dataFile.path(), 107L), + Pair.of(dataFile.path(), 109L)); + Pair posDeletes = + FileHelpers.writeDeleteFile(table, Files.localOutput(temp.newFile()), deletes); + tbl.newRowDelta() + .addDeletes(posDeletes.first()) + .validateDataFilesExist(posDeletes.second()) + .commit(); + + Assert.assertEquals(193, rowSet(tblName, tbl, "*").size()); + } + private static final Schema PROJECTION_SCHEMA = new Schema( required(1, "id", Types.IntegerType.get()),