From f55c89bddd602398cddac92a595ab389428cb893 Mon Sep 17 00:00:00 2001 From: Wing Yew Poon Date: Mon, 24 Oct 2022 22:40:01 -0700 Subject: [PATCH] Spark 3.1: Ensure rowStartPosInBatch in ColumnarBatchReader is set correctly --- .../iceberg/spark/source/BatchDataReader.java | 7 +- .../spark/source/TestSparkReaderDeletes.java | 93 ++++++++++++++++++- 2 files changed, 97 insertions(+), 3 deletions(-) diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java index 35d0a9cbac23..68e98ba913b7 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java @@ -77,14 +77,17 @@ CloseableIterator open(FileScanTask task) { if (task.file().format() == FileFormat.PARQUET) { SparkDeleteFilter deleteFilter = deleteFilter(task); + // get required schema if there are deletes + Schema requiredSchema = deleteFilter != null ? deleteFilter.requiredSchema() : expectedSchema; + Parquet.ReadBuilder builder = Parquet.read(location) - .project(expectedSchema) + .project(requiredSchema) .split(task.start(), task.length()) .createBatchedReaderFunc( fileSchema -> VectorizedSparkParquetReaders.buildReader( - expectedSchema, + requiredSchema, fileSchema, /* setArrowValidityVector */ NullCheckingForGet.NULL_CHECKING_ENABLED, idToConstant, diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java index 575555d745c2..bbc4171b4214 100644 --- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java +++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java @@ -20,12 +20,17 @@ import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS; +import java.io.File; import java.io.IOException; import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.iceberg.BaseTable; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.Files; import org.apache.iceberg.PartitionSpec; @@ -45,19 +50,28 @@ import org.apache.iceberg.hive.HiveCatalog; import org.apache.iceberg.hive.TestHiveMetastore; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.parquet.ParquetSchemaUtil; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.SparkStructLike; +import org.apache.iceberg.spark.data.RandomData; +import org.apache.iceberg.spark.data.SparkParquetWriters; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.CharSequenceSet; import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.StructLikeSet; import org.apache.iceberg.util.TableScanUtil; +import org.apache.parquet.hadoop.ParquetFileWriter; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.internal.SQLConf; +import org.apache.spark.sql.types.StructType; +import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -117,16 +131,26 @@ public static void stopMetastoreAndSpark() throws Exception { spark = null; } + @After + @Override + public void cleanup() throws IOException { + super.cleanup(); + dropTable("test3"); + } + @Override protected Table createTable(String name, Schema schema, PartitionSpec spec) { Table table = catalog.createTable(TableIdentifier.of("default", name), schema); TableOperations ops = ((BaseTable) table).operations(); TableMetadata meta = ops.current(); ops.commit(meta, meta.upgradeToFormatVersion(2)); + table + .updateProperties() + .set(TableProperties.PARQUET_VECTORIZATION_ENABLED, String.valueOf(vectorized)) + .commit(); if (vectorized) { table .updateProperties() - .set(TableProperties.PARQUET_VECTORIZATION_ENABLED, "true") .set( TableProperties.PARQUET_BATCH_SIZE, "4") // split 7 records to two batches to cover more code paths @@ -293,4 +317,71 @@ public void testPosDeletesAllRowsInBatch() throws IOException { Assert.assertEquals("Table should contain expected rows", expected, actual); } + + @Test + public void testPosDeletesOnParquetFileWithMultipleRowGroups() throws IOException { + String tblName = "test3"; + Table tbl = createTable(tblName, SCHEMA, PartitionSpec.unpartitioned()); + + List fileSplits = Lists.newArrayList(); + StructType sparkSchema = SparkSchemaUtil.convert(SCHEMA); + Configuration conf = new Configuration(); + File testFile = temp.newFile(); + Assert.assertTrue("Delete should succeed", testFile.delete()); + Path testFilePath = new Path(testFile.getAbsolutePath()); + + // Write a Parquet file with more than one row group + ParquetFileWriter parquetFileWriter = + new ParquetFileWriter(conf, ParquetSchemaUtil.convert(SCHEMA, "test3Schema"), testFilePath); + parquetFileWriter.start(); + for (int i = 0; i < 2; i += 1) { + File split = temp.newFile(); + Assert.assertTrue("Delete should succeed", split.delete()); + Path splitPath = new Path(split.getAbsolutePath()); + fileSplits.add(splitPath); + try (FileAppender writer = + Parquet.write(Files.localOutput(split)) + .createWriterFunc(msgType -> SparkParquetWriters.buildWriter(sparkSchema, msgType)) + .schema(SCHEMA) + .overwrite() + .build()) { + Iterable records = RandomData.generateSpark(SCHEMA, 100, 34 * i + 37); + writer.addAll(records); + } + parquetFileWriter.appendFile( + org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(splitPath, conf)); + } + parquetFileWriter.end( + ParquetFileWriter.mergeMetadataFiles(fileSplits, conf) + .getFileMetaData() + .getKeyValueMetaData()); + + // Add the file to the table + DataFile dataFile = + DataFiles.builder(PartitionSpec.unpartitioned()) + .withInputFile(org.apache.iceberg.hadoop.HadoopInputFile.fromPath(testFilePath, conf)) + .withFormat("parquet") + .withRecordCount(200) + .build(); + tbl.newAppend().appendFile(dataFile).commit(); + + // Add positional deletes to the table + List> deletes = + Lists.newArrayList( + Pair.of(dataFile.path(), 97L), + Pair.of(dataFile.path(), 98L), + Pair.of(dataFile.path(), 99L), + Pair.of(dataFile.path(), 101L), + Pair.of(dataFile.path(), 103L), + Pair.of(dataFile.path(), 107L), + Pair.of(dataFile.path(), 109L)); + Pair posDeletes = + FileHelpers.writeDeleteFile(table, Files.localOutput(temp.newFile()), deletes); + tbl.newRowDelta() + .addDeletes(posDeletes.first()) + .validateDataFilesExist(posDeletes.second()) + .commit(); + + Assert.assertEquals(193, rowSet(tblName, tbl, "*").size()); + } }